From cfeec40d296756e73d8a66f997688b2e33ac0eb2 Mon Sep 17 00:00:00 2001 From: Sergio Bobillier Date: Fri, 8 Aug 2025 16:37:04 +0200 Subject: [PATCH 1/6] [JAY-676] Move code from Index to Indexable Creates the Indexable mixin and moves much of the code from the Elasticsearch::Index class there. These methods can actually be used with a single index or with multiple indexes. In an upcoming commit these methods will be used by another class which will allow multiple indexes to be searched. Data could also be pushed to multiple indexes at the same time. --- lib/jay_api/elasticsearch/index.rb | 214 +---------------------- lib/jay_api/elasticsearch/indexable.rb | 228 +++++++++++++++++++++++++ 2 files changed, 231 insertions(+), 211 deletions(-) create mode 100644 lib/jay_api/elasticsearch/indexable.rb diff --git a/lib/jay_api/elasticsearch/index.rb b/lib/jay_api/elasticsearch/index.rb index 96b7a38..8e1a52b 100644 --- a/lib/jay_api/elasticsearch/index.rb +++ b/lib/jay_api/elasticsearch/index.rb @@ -1,223 +1,15 @@ # frozen_string_literal: true -require 'active_support' -require 'active_support/json' # Needed because ActiveSupport 6 doesn't include it's own JSON module. 🤦 -require 'active_support/core_ext/hash/indifferent_access' -require 'active_support/core_ext/hash/keys' -require 'elasticsearch' -require 'logging' - -require_relative 'errors/elasticsearch_error' -require_relative 'async' -require_relative 'query_results' -require_relative 'response' -require_relative 'batch_counter' -require_relative 'search_after_results' +require_relative 'indexable' module JayAPI module Elasticsearch # Represents an Elasticsearch index. Allows data to be pushed to it one # record at a time or in batches of the specified size. class Index - attr_reader :client, :index_name, :batch_size - - # Default type for documents indexed with the #index method. - DEFAULT_DOC_TYPE = 'nested' - - # Supported document types (for the #index method) - SUPPORTED_TYPES = [DEFAULT_DOC_TYPE, nil].freeze - - # :reek:ControlParameter (want to avoid the creating of the logger on method definition) - # Creates a new instance of the class. - # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client object. - # @param [String] index_name The name of the Elasticsearch index. - # @param [Integer] batch_size The size of the batch. When this number of - # items are pushed into the index they are flushed to the - # Elasticsearch instance. - # @param [Logging::Logger, nil] logger The logger object to use, if - # none is given a new one will be created. - def initialize(client:, index_name:, batch_size: 100, logger: nil) - @logger = logger || Logging.logger[self] - - @client = client - @index_name = index_name - @batch_size = batch_size - - @batch = [] - end - - # Pushes a record into the index. (This does not send the record to the - # Elasticsearch instance, only puts it into the send queue). - # @param [Hash] data The data to be pushed to the index. - def push(data) - @batch << { index: { _index: index_name, _type: 'nested', data: data } } - flush! if @batch.size >= batch_size - end - - # Sends a record to the Elasticsearch instance right away. - # @param [Hash] data The data to be sent. - # @param [String, nil] type The type of the document. When set to +nil+ - # the decision is left to Elasticsearch's API. Which will normally - # default to +_doc+. - # @return [Hash] A hash with information about the created document. An - # example of such Hash is: - # - # { - # "_index" => "xyz01_unit_test", - # "_type" => "nested", - # "_id" => "SVY1mJEBQ5CNFZM8Lodt", - # "_version" => 1, - # "result" => "created", - # "_shards" => { "total" => 2, "successful" => 1, "failed" => 0 }, - # "_seq_no" => 0, - # "_primary_term" => 1 - # } - # - # For information on the contents of this Hash please see: - # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-response-body - def index(data, type: DEFAULT_DOC_TYPE) - raise ArgumentError, "Unsupported type: '#{type}'" unless SUPPORTED_TYPES.include?(type) - - client.index index: index_name, type: type, body: data - end - - # Performs a query on the index. - # For more information on how to build the query please refer to the - # Elasticsearch DSL query: - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html - # @param [Hash] query The query to perform. - # @param [JayAPI::Elasticsearch::BatchCounter, nil] batch_counter Object keeping track of batches. - # @param [Symbol, nil] type Type of query, at the moment either nil or :search_after. - # @return [JayAPI::Elasticsearch::QueryResults] The query results. - # @raise [Elasticsearch::Transport::Transport::ServerError] If the - # query fails. - def search(query, batch_counter: nil, type: nil) - begin - response = Response.new(client.search(index: index_name, body: query)) - rescue ::Elasticsearch::Transport::Transport::Errors::BadRequest - logger.error "The 'search' query is invalid: #{JSON.pretty_generate(query)}" - raise - end - query_results(query, response, batch_counter, type) - end - - # Sends whatever is currently in the send queue to the Elasticsearch - # instance and clears the queue. - def flush - return unless @batch.any? - - flush! - end - - # Returns the number of elements currently on the send queue. - # @return [Integer] The number of items in the send queue. - def queue_size - @batch.size - end - - # Delete the documents matching the given query from the Index. - # For more information on how to build the query please refer to the - # Elasticsearch DSL documentation: - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html - # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html - # @param [Hash] query The delete query - # @param [Integer] slices Number of slices to cut the operation into for - # faster processing (i.e., run the operation in parallel) - # @param [Boolean] wait_for_completion False if Elasticsearch should not - # wait for completion and perform the request asynchronously, true if it - # should wait for completion (i.e., run the operation asynchronously) - # @return [Hash] A Hash that details the results of the operation - # @example Returned Hash (with `wait_for_completion: true`): - # { - # took: 103, - # timed_out: false, - # total: 76, - # deleted: 76, - # batches: 1, - # version_conflicts: 0, - # noops: 0, - # retries: { bulk: 0, search: 0 }, - # throttled_millis: 0, - # requests_per_second: 1.0, - # throttled_until_millis: 0, - # failures: [] - # } - # @example Returned Hash (with `wait_for_completion: false`): - # { - # task: "B5oDyEsHQu2Q-wpbaMSMTg:577388264" - # } - # @raise [Elasticsearch::Transport::Transport::ServerError] If the - # query fails. - def delete_by_query(query, slices: nil, wait_for_completion: true) - request_params = { index: index_name, body: query }.tap do |params| - params.merge!(slices: slices) if slices - params.merge!(wait_for_completion: false) unless wait_for_completion - end - - client.delete_by_query(**request_params).deep_symbolize_keys - end - - # Deletes asynchronously the documents matching the given query from the - # Index. - # For more information on how to build the query please refer to the - # Elasticsearch DSL documentation: - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html - # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html - # @param [Hash] query The delete query - # @param [Integer, String] slices Number of slices to cut the operation - # into for faster processing (i.e., run the operation in parallel). Use - # "auto" to make elasticsearch decide how many slices to divide into - # @return [Concurrent::Promise] The eventual value returned from the single - # completion of the delete operation - def delete_by_query_async(query, slices: nil) - async.delete_by_query(query, slices: slices) - end - - private - - attr_reader :logger, :batch - - # Flushes the current send queue to the Elasticsearch instance and - # clears the queue. - def flush! - logger.info "Pushing data to Elasticsearch (#{batch.size} records)..." - response = client.bulk body: batch - - handle_errors(response) if response['errors'] - - logger.info 'Done' - @batch = [] - end - - # @param [Hash] query The elastic search query. - # @param [JayAPI::Elasticsearch::Response] response The response to the query. - # @param [JayAPI::Elasticsearch::BatchCounter, nil] batch_counter Object keeping track of batches. - # @param [Symbol, nil] type Type of query, at the moment either nil or :search_after. - # @return [QueryResults] - def query_results(query, response, batch_counter, type) - (type == :search_after ? SearchAfterResults : QueryResults).new( - index: self, - query: query, - response: response, - batch_counter: BatchCounter.create_or_update(batch_counter, query, response.size) - ) - end - - # Scans the Elasticsearch response in search for the first item that has - # en erroneous state and raises an error including the error details. - # @param [Hash] response The response returned by the Elasticsearch client. - # @raise [Errors::ElasticsearchError] Is always raised. - def handle_errors(response) - error_item = response['items'].find { |item| item['index']['error'] } - - raise Errors::ElasticsearchError, - "An error occurred when pushing the data to Elasticsearch:\n#{error_item['index']['error'].inspect}" - end + include ::JayAPI::Elasticsearch::Indexable - # @return [JayAPI::Elasticsearch::Async] - def async - @async ||= JayAPI::Elasticsearch::Async.new(self) - end + attr_reader :index_name end end end diff --git a/lib/jay_api/elasticsearch/indexable.rb b/lib/jay_api/elasticsearch/indexable.rb new file mode 100644 index 0000000..c5c82cb --- /dev/null +++ b/lib/jay_api/elasticsearch/indexable.rb @@ -0,0 +1,228 @@ +# frozen_string_literal: true + +require 'active_support' +require 'active_support/json' # Needed because ActiveSupport 6 doesn't include it's own JSON module. 🤦 +require 'active_support/core_ext/hash/keys' +require 'elasticsearch' +require 'logging' + +require_relative 'async' +require_relative 'batch_counter' +require_relative 'errors/elasticsearch_error' +require_relative 'query_results' +require_relative 'response' +require_relative 'search_after_results' + +module JayAPI + module Elasticsearch + # This module houses the Elasticsearch methods that can be used with a + # single or with multiple indexes. Its main purpose is to avoid code + # repetition between classes. + module Indexable + # Default type for documents indexed with the #index method. + DEFAULT_DOC_TYPE = 'nested' + + # Supported document types (for the #index method) + SUPPORTED_TYPES = [DEFAULT_DOC_TYPE, nil].freeze + + attr_reader :client, :batch_size + + # :reek:ControlParameter (want to avoid the creating of the logger on method definition) + # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client object. + # @param [String] index_name The name of the Elasticsearch index. + # @param [Integer] batch_size The size of the batch. When this many items + # are pushed into the index they are flushed to the Elasticsearch + # instance. + # @param [Logging::Logger, nil] logger The logger object to use, if + # none is given a new one will be created. + def initialize(client:, index_name:, batch_size: 100, logger: nil) + @logger = logger || Logging.logger[self] + + @client = client + @index_name = index_name + @batch_size = batch_size + + @batch = [] + end + + # Pushes a record into the index. (This does not send the record to the + # Elasticsearch instance, only puts it into the send queue). + # @param [Hash] data The data to be pushed to the index. + def push(data) + batch << { index: { _index: index_name, _type: 'nested', data: data } } + flush! if batch.size >= batch_size + end + + # Sends a record to the Elasticsearch instance right away. + # @param [Hash] data The data to be sent. + # @param [String, nil] type The type of the document. When set to +nil+ + # the decision is left to Elasticsearch's API. Which will normally + # default to +_doc+. + # @return [Hash] A hash with information about the created document. An + # example of such Hash is: + # + # { + # "_index" => "xyz01_unit_test", + # "_type" => "nested", + # "_id" => "SVY1mJEBQ5CNFZM8Lodt", + # "_version" => 1, + # "result" => "created", + # "_shards" => { "total" => 2, "successful" => 1, "failed" => 0 }, + # "_seq_no" => 0, + # "_primary_term" => 1 + # } + # + # For information on the contents of this Hash please see: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-response-body + def index(data, type: DEFAULT_DOC_TYPE) + raise ArgumentError, "Unsupported type: '#{type}'" unless SUPPORTED_TYPES.include?(type) + + client.index index: index_name, type: type, body: data + end + + # Performs a query on the index. + # For more information on how to build the query please refer to the + # Elasticsearch DSL query: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # @param [Hash] query The query to perform. + # @param [JayAPI::Elasticsearch::BatchCounter, nil] batch_counter Object keeping track of batches. + # @param [Symbol, nil] type Type of query, at the moment either nil or :search_after. + # @return [JayAPI::Elasticsearch::QueryResults] The query results. + # @raise [Elasticsearch::Transport::Transport::ServerError] If the + # query fails. + def search(query, batch_counter: nil, type: nil) + begin + response = Response.new(client.search(index: index_name, body: query)) + rescue ::Elasticsearch::Transport::Transport::Errors::BadRequest + logger.error "The 'search' query is invalid: #{JSON.pretty_generate(query)}" + raise + end + query_results(query, response, batch_counter, type) + end + + # Sends whatever is currently in the send queue to the Elasticsearch + # instance and clears the queue. + def flush + return unless @batch.any? + + flush! + end + + # Returns the number of elements currently on the send queue. + # @return [Integer] The number of items in the send queue. + def queue_size + batch.size + end + + # Delete the documents matching the given query from the Index. + # For more information on how to build the query please refer to the + # Elasticsearch DSL documentation: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html + # @param [Hash] query The delete query + # @param [Integer] slices Number of slices to cut the operation into for + # faster processing (i.e., run the operation in parallel) + # @param [Boolean] wait_for_completion False if Elasticsearch should not + # wait for completion and perform the request asynchronously, true if it + # should wait for completion (i.e., run the operation synchronously) + # @return [Hash] A Hash that details the results of the operation + # @example Returned Hash (with `wait_for_completion: true`): + # { + # took: 103, + # timed_out: false, + # total: 76, + # deleted: 76, + # batches: 1, + # version_conflicts: 0, + # noops: 0, + # retries: { bulk: 0, search: 0 }, + # throttled_millis: 0, + # requests_per_second: 1.0, + # throttled_until_millis: 0, + # failures: [] + # } + # @example Returned Hash (with `wait_for_completion: false`): + # { + # task: "B5oDyEsHQu2Q-wpbaMSMTg:577388264" + # } + # @raise [Elasticsearch::Transport::Transport::ServerError] If the + # query fails. + def delete_by_query(query, slices: nil, wait_for_completion: true) + request_params = { index: index_name, body: query }.tap do |params| + params.merge!(slices: slices) if slices + params.merge!(wait_for_completion: false) unless wait_for_completion + end + + client.delete_by_query(**request_params).deep_symbolize_keys + end + + # Deletes asynchronously the documents matching the given query from the + # Index. + # For more information on how to build the query please refer to the + # Elasticsearch DSL documentation: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html + # @param [Hash] query The delete query + # @param [Integer, String] slices Number of slices to cut the operation + # into for faster processing (i.e., run the operation in parallel). Use + # "auto" to make Elasticsearch decide how many slices to divide into + # @return [Concurrent::Promise] The eventual value returned from the single + # completion of the delete operation + def delete_by_query_async(query, slices: nil) + async.delete_by_query(query, slices: slices) + end + + private + + attr_reader :logger, :batch + + # @raise [NotImplementedError] Is always raised, class including the + # module **must** implement this method. + def index_name + raise NotImplementedError, "Please implement the method #{__method__} in #{self.class}" + end + + # Scans the Elasticsearch response in search for the first item that has + # an erroneous state and raises an error including the error details. + # @param [Hash] response The response returned by the Elasticsearch client. + # @raise [Errors::ElasticsearchError] Is always raised. + def handle_errors(response) + error_item = response['items'].find { |item| item['index']['error'] } + + raise Errors::ElasticsearchError, + "An error occurred when pushing the data to Elasticsearch:\n#{error_item['index']['error'].inspect}" + end + + # Flushes the current send queue to the Elasticsearch instance and + # clears the queue. + def flush! + logger.info "Pushing data to Elasticsearch (#{batch.size} records)..." + response = client.bulk body: batch + + handle_errors(response) if response['errors'] + + logger.info 'Done' + @batch = [] + end + + # @param [Hash] query The elastic search query. + # @param [JayAPI::Elasticsearch::Response] response The response to the query. + # @param [JayAPI::Elasticsearch::BatchCounter, nil] batch_counter Object keeping track of batches. + # @param [Symbol, nil] type Type of query, at the moment either nil or :search_after. + # @return [QueryResults] + def query_results(query, response, batch_counter, type) + (type == :search_after ? SearchAfterResults : QueryResults).new( + index: self, + query: query, + response: response, + batch_counter: BatchCounter.create_or_update(batch_counter, query, response.size) + ) + end + + # @return [JayAPI::Elasticsearch::Async] + def async + @async ||= JayAPI::Elasticsearch::Async.new(self) + end + end + end +end From 068587b4798494c8279e3e9da6c2ad0badc4f8d5 Mon Sep 17 00:00:00 2001 From: Sergio Bobillier Date: Fri, 8 Aug 2025 17:08:25 +0200 Subject: [PATCH 2/6] [JAY-676] Move common test code to a shared file Moves the test code for the methods that were moved to the Indexable mixin to a shared examples file. This test code will be used in an upcoming commit in order to test other classes that include the mixin. Note: Some changes were made to the test code in order to allow it to work in a shared_examples / shared_context block. These changes are mainly inconsequential, for example, moving the parameters of a method call to a separate let. --- spec/jay_api/elasticsearch/index_spec.rb | 563 ++---------------- .../jay_api/elasticsearch/indexable_shared.rb | 506 ++++++++++++++++ 2 files changed, 569 insertions(+), 500 deletions(-) create mode 100644 spec/jay_api/elasticsearch/indexable_shared.rb diff --git a/spec/jay_api/elasticsearch/index_spec.rb b/spec/jay_api/elasticsearch/index_spec.rb index e8b972c..9fc6f88 100644 --- a/spec/jay_api/elasticsearch/index_spec.rb +++ b/spec/jay_api/elasticsearch/index_spec.rb @@ -2,18 +2,18 @@ require 'jay_api/elasticsearch/index' +require_relative 'indexable_shared' + RSpec.describe JayAPI::Elasticsearch::Index do - subject(:index) { described_class.new(**params) } + subject(:index) { described_class.new(**constructor_params) } - let(:base_params) do + let(:constructor_params) do { index_name: index_name, - client: mocked_elasticsearch + client: client } end - let(:params) { base_params } - let(:index_name) { 'elite_unit_tests' } let(:successful_response) do @@ -36,99 +36,30 @@ } end - let(:response) { {} } - let(:mocked_elasticsearch) do - instance_double( - JayAPI::Elasticsearch::Client, - bulk: successful_response, - search: response - ) - end - - # rubocop:disable RSpec/VerifiedDoubles (Does not work with Logging because of meta-programming) - let(:mocked_logger) do - double( - Logging::Logger, - info: true, - error: true - ) - end - # rubocop:enable RSpec/VerifiedDoubles - let(:data) { { type: 'example', name: 'Test data example' } } - before do - allow(Logging.logger).to receive(:[]).and_return(mocked_logger) - end + include_context 'with mocked objects for Elasticsearch::Indexable' describe '#initialize' do subject(:method_call) { index } - context 'when no logger has been given' do - it 'creates a Logger for the class' do - expect(Logging.logger).to receive(:[]).with(described_class) - method_call - end - end - - context 'when a logger has been given' do - let(:params) do - base_params.update(logger: mocked_logger) - end - - it 'does not create a new logger' do - expect(Logging.logger).not_to receive(:[]) - method_call - end - end + it_behaves_like 'Indexable#initialize' end describe '#push' do - let(:batch_size) { 2 } - - let(:expected_data) do - { - body: [ - { - index: { - _index: index_name, - _type: 'nested', - data: data - } - } - ] * 2 - } - end - - let(:params) do - base_params.update(batch_size: batch_size) - end - - it 'puts the data with the correct structure in the queue' do - expect(mocked_elasticsearch).to receive(:bulk).with(expected_data) - 2.times { index.push(data) } - end - - context 'when the amount of data is smaller than the batch size' do - let(:batch_size) { 100 } - - it 'enqueues the data but does not push it to Elasticsearch' do - expect(mocked_elasticsearch).not_to receive(:bulk) - 10.times { index.push(data) } - end - end + subject(:method_call) { described_method.call } - context 'when the amount of data goes over the batch size' do - let(:batch_size) { 5 } + # Needed in order to be able to repeat the method call + let(:described_method) { -> { index.push(data) } } - it 'pushes the data to Elasticsearch every time the batch size is hit' do - expect(mocked_elasticsearch).to receive(:bulk).exactly(3).times - 15.times { index.push(data) } - end - end + it_behaves_like 'Indexable#push' end describe '#index' do + subject(:method_call) { index.index(data, **method_params) } + + let(:method_params) { {} } + let(:successful_response) do { '_index' => 'xyz01_unit_test', @@ -142,186 +73,63 @@ } end - before do - allow(mocked_elasticsearch).to receive(:index).and_return(successful_response) - end - - shared_examples_for '#index when no type is specified' do - let(:expected_data) do - { - index: index_name, - type: 'nested', - body: data - } - end - - it 'sends the given data to Elasticsearch right away (with the "nested" type)' do - expect(mocked_elasticsearch).to receive(:index).with(expected_data) - method_call - end - - it 'returns the expected Hash' do - expect(method_call).to eq(successful_response) - end - end - - context 'when no type is specified' do - subject(:method_call) { index.index(data) } - - it_behaves_like '#index when no type is specified' - end - - context 'when type is specified as "nested"' do - subject(:method_call) { index.index(data, type: 'nested') } - - it_behaves_like '#index when no type is specified' - end - - context 'when type is set to nil' do - subject(:method_call) { index.index(data, type: nil) } - - let(:expected_data) do - { - index: index_name, - type: nil, - body: data - } - end - - it "sends the given data to Elasticsearch right away (with 'type' set to nil)" do - expect(mocked_elasticsearch).to receive(:index).with(expected_data) - method_call - end - - it 'returns the expected Hash' do - expect(method_call).to eq(successful_response) - end - end - - context 'when type is set to an invalid value' do - subject(:method_call) { index.index(data, type: 'flatten') } - - it 'raises an ArgumentError' do - expect { method_call }.to raise_error(ArgumentError, "Unsupported type: 'flatten'") - end - end + it_behaves_like 'Indexable#index' end describe '#queue_size' do subject(:method_call) { index.queue_size } - let(:params) do - base_params.update(batch_size: 15) - end - - context 'with multiple items in the queue' do - before do - 10.times { index.push(data) } - end - - it 'returns the correct number of items' do - expect(method_call).to eq(10) - end - end + let(:indexable) { index } - context 'with no items in the queue' do - it 'returns 0' do - expect(method_call).to eq(0) - end - end - - context 'when an item is pushed to the index' do - it 'increases accordingly' do - expect { index.push(data) }.to change(index, :queue_size).by(1) - end - end + it_behaves_like 'Indexable#queue_size' end describe '#flush' do subject(:method_call) { index.flush } - let(:params) do - base_params.update(batch_size: 5) - end + let(:indexable) { index } - context 'when there is no data' do - it 'does not try to push any data' do - expect(mocked_elasticsearch).not_to receive(:bulk) - method_call - end - end - - context 'when there is data in the buffer' do - before { 3.times { index.push(data) } } - - it 'pushes the data currently held in the queue to Elasticsearch' do - expect(mocked_elasticsearch).to receive(:bulk).once - method_call - end - - it 'clears the queue' do - expect { method_call }.to change(index, :queue_size).from(3).to(0) - end - - context 'when there is an error on the Elasticsearch instance' do - let(:error_response) do + let(:error_response) do + { + 'took' => 9, + 'errors' => true, + 'items' => [ { - 'took' => 9, - 'errors' => true, - 'items' => [ - { - 'index' => { - '_index' => index_name, - '_type' => 'nested', - '_id' => 'EExOS4cBrb-mrkbmwAGg', - 'result' => 'created', - '_shards' => { - 'total' => 2, - 'successful' => 1, - 'failed' => 0 - }, - 'status' => 201 - } + 'index' => { + '_index' => index_name, + '_type' => 'nested', + '_id' => 'EExOS4cBrb-mrkbmwAGg', + 'result' => 'created', + '_shards' => { + 'total' => 2, + 'successful' => 1, + 'failed' => 0 }, - { - 'index' => { - '_index' => index_name, - '_type' => 'nested', - '_id' => 'fnKi628BVvaJMVLXJxty', - 'status' => 400, - 'error' => { - 'type' => 'illegal_argument_exception', - 'reason' => 'mapper [test_env.report_meta.SUT tags.cluster] ' \ - 'of different type, current_type [long], merged_type [text]' - } - } + 'status' => 201 + } + }, + { + 'index' => { + '_index' => index_name, + '_type' => 'nested', + '_id' => 'fnKi628BVvaJMVLXJxty', + 'status' => 400, + 'error' => { + 'type' => 'illegal_argument_exception', + 'reason' => 'mapper [test_env.report_meta.SUT tags.cluster] ' \ + 'of different type, current_type [long], merged_type [text]' } - ] + } } - end - - let(:expected_message) do - "An error occurred when pushing the data to Elasticsearch:\n" \ - '{"type"=>"illegal_argument_exception", "reason"=>' \ - '"mapper [test_env.report_meta.SUT tags.cluster] of ' \ - 'different type, current_type [long], merged_type [text]"}' - end - - before do - allow(mocked_elasticsearch).to receive(:bulk).and_return(error_response) - end - - it 'raises an error' do - expect { method_call }.to raise_error( - JayAPI::Elasticsearch::Errors::ElasticsearchError, expected_message - ) - end - end + ] + } end + + it_behaves_like 'Indexable#flush' end describe '#search' do - subject(:method_call) { index.search(query, batch_counter: batch_counter) } + subject(:method_call) { index.search(query, **method_params) } let(:query) do { @@ -341,6 +149,10 @@ } end + let(:method_params) { { batch_counter: batch_counter } } + + let(:indexable) { index } + let(:expected_query) do { index: index_name, @@ -348,112 +160,13 @@ } end - let(:response) do - { - 'took' => 1, - 'timed_out' => false, - '_shards' => { - 'total' => 5, - 'successful' => 5, - 'skipped' => 0, - 'failed' => 0 - }, - 'hits' => { - 'total' => { - 'value' => 33, - 'relation' => 'eq' - }, - 'max_score' => nil, - 'hits' => [] - } - } - end - - let(:response_object) { instance_double(JayAPI::Elasticsearch::Response, size: 33) } - let(:batch_counter) { instance_double(JayAPI::Elasticsearch::BatchCounter) } - - before do - allow(JayAPI::Elasticsearch::QueryResults).to receive(:new) - allow(JayAPI::Elasticsearch::Response).to receive(:new).with(response).and_return(response_object) - allow(JayAPI::Elasticsearch::BatchCounter).to receive(:create_or_update).with(batch_counter, query, 33) - .and_return(batch_counter) - end - - it 'sends the expected query to Elasticsearch' do - expect(mocked_elasticsearch).to receive(:search).with(expected_query) - method_call - end - - context 'when Elasticsearch responds with an error' do - let(:expected_error) { Elasticsearch::Transport::Transport::Errors::BadRequest } - - let(:expected_log) do - <<~TEXT - The 'search' query is invalid: { - "query": { - "query_string": { - "fields": [ - "test_case.identifier" - ], - "query": "\\"Elite::Tools::Jay::ElasticsearchIndex/#push/Puts the data with the correct structure in the queue\\"" - } - }, - "sort": [ - { - "test_case.finished_at.keyword": { - "order": "desc" - } - } - ] - } - TEXT - end - - before do - allow(mocked_elasticsearch).to receive(:search) - .and_raise(expected_error) - end - - it 'raises the error to the caller' do - expect { method_call }.to raise_error(expected_error) - end - - it 'logs the query which caused the error message' do - expect(mocked_logger).to receive(:error).with(expected_log.strip) - - expect { method_call }.to raise_error(expected_error) - end - end - - context 'when Elasticsearch responds with a valid response' do - context "without 'search_after' type parameter in the options" do - it 'creates a new instance of the QueryResults class and passes the response' do - expect(JayAPI::Elasticsearch::QueryResults) - .to receive(:new).with(index: index, query: query, response: response_object, batch_counter: batch_counter) - - method_call - end - end - - context "with 'search_after' type parameter in the options" do - subject(:method_call) { index.search(query, batch_counter: batch_counter, type: :search_after) } - - before do - allow(JayAPI::Elasticsearch::SearchAfterResults).to receive(:new) - end - - it 'creates a new instance of the SearchAfterResults class and passes the response' do - expect(JayAPI::Elasticsearch::SearchAfterResults) - .to receive(:new).with(index: index, query: query, response: response_object, batch_counter: batch_counter) - - method_call - end - end - end + it_behaves_like 'Indexable#search' end describe '#delete_by_query' do - subject(:method_call) { index.delete_by_query(query) } + subject(:method_call) { index.delete_by_query(query, **method_params) } + + let(:method_params) { {} } let(:query) do { @@ -472,132 +185,7 @@ } end - let(:successful_response) do - { - 'took' => 103, - 'timed_out' => false, - 'total' => 76, - 'deleted' => 76, - 'batches' => 1, - 'version_conflicts' => 0, - 'noops' => 0, - 'retries' => { 'bulk' => 0, 'search' => 0 }, - 'throttled_millis' => 0, - 'requests_per_second' => 1.0, - 'throttled_until_millis' => 0, - 'failures' => [] - } - end - - before do - allow(mocked_elasticsearch).to receive(:delete_by_query).and_return(successful_response) - end - - it 'relays the command to the Elasticsearch client' do - expect(mocked_elasticsearch).to receive(:delete_by_query).with( - index: index_name, body: query - ) - - method_call - end - - context 'when a custom number of slices is to be used' do - subject(:method_call) { index.delete_by_query(query, slices: 5) } - - it 'relays the command to the Elasticsearch client' do - expect(mocked_elasticsearch).to receive(:delete_by_query).with( - index: index_name, body: query, slices: 5 - ) - - method_call - end - end - - context 'when the client should not wait for completion' do - subject(:method_call) { index.delete_by_query(query, wait_for_completion: false) } - - it 'relays the command to the Elasticsearch client' do - expect(mocked_elasticsearch).to receive(:delete_by_query).with( - index: index_name, body: query, wait_for_completion: false - ) - - method_call - end - end - - context 'when the client should not wait for completion and should use a custom number of slices' do - subject(:method_call) { index.delete_by_query(query, slices: 5, wait_for_completion: false) } - - it 'relays the command to the Elasticsearch client' do - expect(mocked_elasticsearch).to receive(:delete_by_query).with( - index: index_name, body: query, slices: 5, wait_for_completion: false - ) - - method_call - end - end - - context 'when the deletion succeeds' do - context 'when the deletion has been executed synchronously (i.e., `wait_for_completion` is `true`)' do - let(:expected_hash) do - { - took: 103, - timed_out: false, - total: 76, - deleted: 76, - batches: 1, - version_conflicts: 0, - noops: 0, - retries: { bulk: 0, search: 0 }, - throttled_millis: 0, - requests_per_second: 1.0, - throttled_until_millis: 0, - failures: [] - } - end - - it 'returns the expected Hash' do - expect(method_call).to eq(expected_hash) - end - end - - context 'when the deletion has been executed asynchronously (i.e., `wait_for_completion` is `false`)' do - subject(:method_call) { index.delete_by_query(query, wait_for_completion: false) } - - let(:successful_response) do - { - 'task' => 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' - } - end - - let(:expected_hash) do - { - task: 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' - } - end - - it 'returns the expected Hash' do - expect(method_call).to eq(expected_hash) - end - end - end - - context 'when the deletion fails' do - let(:error) do - [ - Elasticsearch::Transport::Transport::Errors::Unauthorized, - '[401] Unauthorized' - ] - end - - before do - allow(mocked_elasticsearch).to receive(:delete_by_query).and_raise(*error) - end - - it 're-raises the error' do - expect { method_call }.to raise_error(*error) - end - end + it_behaves_like 'Indexable#delete_by_query' end describe '#delete_by_query_async' do @@ -613,33 +201,8 @@ } end - let(:async) do - instance_double( - JayAPI::Elasticsearch::Async, - delete_by_query: response - ) - end - - let(:response) do - {} - end + let(:indexable) { index } - before do - allow(JayAPI::Elasticsearch::Async).to receive(:new).and_return(async) - end - - it 'creates the expected Async object' do - expect(JayAPI::Elasticsearch::Async).to receive(:new).with(index) - method_call - end - - it 'makes the Async object delete by query' do - expect(async).to receive(:delete_by_query).with(query, slices: 5) - method_call - end - - it 'returns what the Async object returns when deleting by query' do - expect(method_call).to eq(response) - end + it_behaves_like 'Indexable#delete_by_query_async' end end diff --git a/spec/jay_api/elasticsearch/indexable_shared.rb b/spec/jay_api/elasticsearch/indexable_shared.rb new file mode 100644 index 0000000..bae8e77 --- /dev/null +++ b/spec/jay_api/elasticsearch/indexable_shared.rb @@ -0,0 +1,506 @@ +# frozen_string_literal: true + +RSpec.shared_context 'with mocked objects for Elasticsearch::Indexable' do + let(:response) { {} } + + let(:client) do + instance_double( + JayAPI::Elasticsearch::Client, + bulk: successful_response, + search: response + ) + end + + # rubocop:disable RSpec/VerifiedDoubles (Does not work with Logging because of meta-programming) + let(:mocked_logger) do + double( + Logging::Logger, + info: true, + error: true + ) + end + # rubocop:enable RSpec/VerifiedDoubles + + before do + allow(Logging.logger).to receive(:[]).and_return(mocked_logger) + end +end + +RSpec.shared_examples_for 'Indexable#initialize' do + context 'when no logger has been given' do + it 'creates a Logger for the class' do + expect(Logging.logger).to receive(:[]).with(described_class) + method_call + end + end + + context 'when a logger has been given' do + let(:constructor_params) do + super().merge(logger: mocked_logger) + end + + it 'does not create a new logger' do + expect(Logging.logger).not_to receive(:[]) + method_call + end + end +end + +RSpec.shared_examples_for 'Indexable#push' do + let(:constructor_params) do + super().merge(batch_size: 10) + end + + let(:expected_data) do + { + body: [ + { + index: { + _index: index_name, + _type: 'nested', + data: data + } + } + ] * 10 + } + end + + context 'when the amount of data is smaller than the batch size' do + it 'enqueues the data but does not push it to Elasticsearch' do + expect(client).not_to receive(:bulk) + 5.times { described_method.call } + end + end + + context 'when the amount of data matches the batch size' do + it 'puts the data with the correct structure in the queue' do + expect(client).to receive(:bulk).with(expected_data) + 10.times { described_method.call } + end + end + + context 'when the amount of data goes over the batch size' do + it 'pushes the data to Elasticsearch every time the batch size is hit' do + expect(client).to receive(:bulk).with(expected_data).exactly(3).times + 30.times { described_method.call } + end + end +end + +RSpec.shared_examples_for 'Indexable#index' do + before do + allow(client).to receive(:index).and_return(successful_response) + end + + shared_examples_for '#index when no type is specified' do + let(:expected_data) do + { + index: index_name, + type: 'nested', + body: data + } + end + + it 'sends the given data to Elasticsearch right away (with the "nested" type)' do + expect(client).to receive(:index).with(expected_data) + method_call + end + + it 'returns the expected Hash' do + expect(method_call).to eq(successful_response) + end + end + + context 'when no type is specified' do + it_behaves_like '#index when no type is specified' + end + + context 'when type is specified as "nested"' do + let(:method_params) { { type: 'nested' } } + + it_behaves_like '#index when no type is specified' + end + + context 'when type is set to nil' do + let(:method_params) { { type: nil } } + + let(:expected_data) do + { + index: index_name, + type: nil, + body: data + } + end + + it "sends the given data to Elasticsearch right away (with 'type' set to nil)" do + expect(client).to receive(:index).with(expected_data) + method_call + end + + it 'returns the expected Hash' do + expect(method_call).to eq(successful_response) + end + end + + context 'when type is set to an invalid value' do + let(:method_params) { { type: 'flatten' } } + + it 'raises an ArgumentError' do + expect { method_call }.to raise_error(ArgumentError, "Unsupported type: 'flatten'") + end + end +end + +RSpec.shared_examples_for 'Indexable#queue_size' do + let(:constructor_params) do + super().merge(batch_size: 15) + end + + context 'with no items have been pushed to the queue' do + it 'returns 0' do + expect(method_call).to eq(0) + end + end + + context 'when a single item is pushed to the index' do + it 'increases accordingly' do + expect { indexable.push(data) }.to change(indexable, :queue_size).by(1) + end + end + + context "when less than 'batch_size' items are pushed to the queue" do + before do + 10.times { indexable.push(data) } + end + + it 'returns the correct number of items' do + expect(method_call).to eq(10) + end + end + + context "when 'batch_size' items are pushed to the index" do + it 'goes back down to zero' do + expect(client).to receive(:bulk).once # Checks that the items are pushed when the queue gets full. + 15.times { indexable.push(data) } + expect(method_call).to be(0) + end + end + + context "when more than 'batch_size' items are pushed to the index" do + it 'returns the expected number of items' do + expect(client).to receive(:bulk).once # Checks that the items are pushed when the queue gets full. + 20.times { indexable.push(data) } + expect(method_call).to be(5) + end + end +end + +RSpec.shared_examples_for 'Indexable#flush' do + let(:constructor_params) do + super().merge(batch_size: 5) + end + + context 'when there is no data' do + it 'does not try to push any data' do + expect(client).not_to receive(:bulk) + method_call + end + end + + context 'when there is data in the buffer' do + before { 3.times { indexable.push(data) } } + + it 'pushes the data currently held in the queue to Elasticsearch' do + expect(client).to receive(:bulk).once + method_call + end + + it 'clears the queue' do + expect { method_call }.to change(indexable, :queue_size).from(3).to(0) + end + + context 'when there is an error on the Elasticsearch instance' do + let(:expected_message) do + "An error occurred when pushing the data to Elasticsearch:\n" \ + '{"type"=>"illegal_argument_exception", "reason"=>' \ + '"mapper [test_env.report_meta.SUT tags.cluster] of ' \ + 'different type, current_type [long], merged_type [text]"}' + end + + before do + allow(client).to receive(:bulk).and_return(error_response) + end + + it 'raises an error' do + expect { method_call }.to raise_error( + JayAPI::Elasticsearch::Errors::ElasticsearchError, expected_message + ) + end + end + end +end + +RSpec.shared_examples_for 'Indexable#search' do + let(:response) do + { + 'took' => 1, + 'timed_out' => false, + '_shards' => { + 'total' => 5, + 'successful' => 5, + 'skipped' => 0, + 'failed' => 0 + }, + 'hits' => { + 'total' => { + 'value' => 33, + 'relation' => 'eq' + }, + 'max_score' => nil, + 'hits' => [] + } + } + end + + let(:response_object) { instance_double(JayAPI::Elasticsearch::Response, size: 33) } + let(:batch_counter) { instance_double(JayAPI::Elasticsearch::BatchCounter) } + + before do + allow(JayAPI::Elasticsearch::QueryResults).to receive(:new) + allow(JayAPI::Elasticsearch::Response).to receive(:new).with(response).and_return(response_object) + + allow(JayAPI::Elasticsearch::BatchCounter).to receive(:create_or_update) + .with(batch_counter, query, 33).and_return(batch_counter) + end + + it 'sends the expected query to Elasticsearch' do + expect(client).to receive(:search).with(expected_query) + method_call + end + + context 'when Elasticsearch responds with an error' do + let(:expected_error) { Elasticsearch::Transport::Transport::Errors::BadRequest } + + let(:expected_log) do + <<~TEXT + The 'search' query is invalid: { + "query": { + "query_string": { + "fields": [ + "test_case.identifier" + ], + "query": "\\"Elite::Tools::Jay::ElasticsearchIndex/#push/Puts the data with the correct structure in the queue\\"" + } + }, + "sort": [ + { + "test_case.finished_at.keyword": { + "order": "desc" + } + } + ] + } + TEXT + end + + before do + allow(client).to receive(:search).and_raise(expected_error) + end + + it 'raises the error to the caller' do + expect { method_call }.to raise_error(expected_error) + end + + it 'logs the query which caused the error message' do + expect(mocked_logger).to receive(:error).with(expected_log.strip) + + expect { method_call }.to raise_error(expected_error) + end + end + + context 'when Elasticsearch responds with a valid response' do + context "without 'search_after' type parameter in the options" do + it 'creates a new instance of the QueryResults class and passes the response' do + expect(JayAPI::Elasticsearch::QueryResults).to receive(:new) + .with(index: indexable, query: query, response: response_object, batch_counter: batch_counter) + + method_call + end + end + + context "with 'search_after' type parameter in the options" do + let(:method_params) { { batch_counter: batch_counter, type: :search_after } } + + before do + allow(JayAPI::Elasticsearch::SearchAfterResults).to receive(:new) + end + + it 'creates a new instance of the SearchAfterResults class and passes the response' do + expect(JayAPI::Elasticsearch::SearchAfterResults).to receive(:new) + .with(index: indexable, query: query, response: response_object, batch_counter: batch_counter) + + method_call + end + end + end +end + +RSpec.shared_examples_for 'Indexable#delete_by_query' do + let(:successful_response) do + { + 'took' => 103, + 'timed_out' => false, + 'total' => 76, + 'deleted' => 76, + 'batches' => 1, + 'version_conflicts' => 0, + 'noops' => 0, + 'retries' => { 'bulk' => 0, 'search' => 0 }, + 'throttled_millis' => 0, + 'requests_per_second' => 1.0, + 'throttled_until_millis' => 0, + 'failures' => [] + } + end + + before do + allow(client).to receive(:delete_by_query).and_return(successful_response) + end + + it 'relays the command to the Elasticsearch client' do + expect(client).to receive(:delete_by_query).with( + index: index_name, body: query + ) + + method_call + end + + context 'when a custom number of slices is to be used' do + let(:method_params) { { slices: 5 } } + + it 'relays the command to the Elasticsearch client' do + expect(client).to receive(:delete_by_query).with( + index: index_name, body: query, slices: 5 + ) + + method_call + end + end + + context 'when the client should not wait for completion' do + let(:method_params) { { wait_for_completion: false } } + + it 'relays the command to the Elasticsearch client' do + expect(client).to receive(:delete_by_query).with( + index: index_name, body: query, wait_for_completion: false + ) + + method_call + end + end + + context 'when the client should not wait for completion and should use a custom number of slices' do + let(:method_params) { { slices: 5, wait_for_completion: false } } + + it 'relays the command to the Elasticsearch client' do + expect(client).to receive(:delete_by_query).with( + index: index_name, body: query, slices: 5, wait_for_completion: false + ) + + method_call + end + end + + context 'when the deletion succeeds' do + context 'when the deletion has been executed synchronously (i.e., `wait_for_completion` is `true`)' do + let(:expected_hash) do + { + took: 103, + timed_out: false, + total: 76, + deleted: 76, + batches: 1, + version_conflicts: 0, + noops: 0, + retries: { bulk: 0, search: 0 }, + throttled_millis: 0, + requests_per_second: 1.0, + throttled_until_millis: 0, + failures: [] + } + end + + it 'returns the expected Hash' do + expect(method_call).to eq(expected_hash) + end + end + + context 'when the deletion has been executed asynchronously (i.e., `wait_for_completion` is `false`)' do + let(:method_params) { { wait_for_completion: false } } + + let(:successful_response) do + { + 'task' => 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' + } + end + + let(:expected_hash) do + { + task: 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' + } + end + + it 'returns the expected Hash' do + expect(method_call).to eq(expected_hash) + end + end + + context 'when the deletion fails' do + let(:error) do + [ + Elasticsearch::Transport::Transport::Errors::Unauthorized, + '[401] Unauthorized' + ] + end + + before do + allow(client).to receive(:delete_by_query).and_raise(*error) + end + + it 're-raises the error' do + expect { method_call }.to raise_error(*error) + end + end + end +end + +RSpec.shared_examples_for 'Indexable#delete_by_query_async' do + let(:async) do + instance_double( + JayAPI::Elasticsearch::Async, + delete_by_query: response + ) + end + + let(:response) do + {} + end + + before do + allow(JayAPI::Elasticsearch::Async).to receive(:new).and_return(async) + end + + it 'creates the expected Async object' do + expect(JayAPI::Elasticsearch::Async).to receive(:new).with(indexable) + method_call + end + + it 'makes the Async object delete by query' do + expect(async).to receive(:delete_by_query).with(query, slices: 5) + method_call + end + + it 'returns what the Async object returns when deleting by query' do + expect(method_call).to eq(response) + end +end From d336cad04886e50fd0043b96c3e61db039c931b5 Mon Sep 17 00:00:00 2001 From: Sergio Bobillier Date: Tue, 19 Aug 2025 11:30:08 +0200 Subject: [PATCH 3/6] [JAY-676] Modify Indexable to work with multiple indexes This is possible because some of the methods of the underlying Elasticsearch client can accept and act over multiple indexes. For those that can only act over a single index, like #index and #bulk extra logic is added to handle this on the Jay API side. In order to keep backwards compatibility a small amount of logic is added to the Index class so that the constructor parameters and the return values of the methods can stay the same. --- lib/jay_api/elasticsearch/index.rb | 41 ++++++++++++++++++- lib/jay_api/elasticsearch/indexable.rb | 29 ++++++------- spec/jay_api/elasticsearch/index_spec.rb | 4 +- .../jay_api/elasticsearch/indexable_shared.rb | 8 ++-- 4 files changed, 60 insertions(+), 22 deletions(-) diff --git a/lib/jay_api/elasticsearch/index.rb b/lib/jay_api/elasticsearch/index.rb index 8e1a52b..4ca1336 100644 --- a/lib/jay_api/elasticsearch/index.rb +++ b/lib/jay_api/elasticsearch/index.rb @@ -9,7 +9,46 @@ module Elasticsearch class Index include ::JayAPI::Elasticsearch::Indexable - attr_reader :index_name + # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client object. + # @param [String] index_name The name of the Elasticsearch index. + # @param [Integer] batch_size The size of the batch. When this many items + # are pushed into the index they are flushed to the Elasticsearch + # instance. + # @param [Logging::Logger, nil] logger The logger object to use, if + # none is given a new one will be created. + def initialize(client:, index_name:, batch_size: 100, logger: nil) + super(client: client, index_names: [index_name], batch_size: batch_size, logger: logger) + end + + # @return [String] The name of the Elasticsearch index. + def index_name + @index_name ||= index_names.first + end + + # Sends a record to the Elasticsearch instance right away. + # @param [Hash] data The data to be sent. + # @param [String, nil] type The type of the document. When set to +nil+ + # the decision is left to Elasticsearch's API. Which will normally + # default to +_doc+. + # @return [Hash] A Hash containing information about the created document. + # An example of such Hash is: + # + # { + # "_index" => "xyz01_unit_test", + # "_type" => "nested", + # "_id" => "SVY1mJEBQ5CNFZM8Lodt", + # "_version" => 1, + # "result" => "created", + # "_shards" => { "total" => 2, "successful" => 1, "failed" => 0 }, + # "_seq_no" => 0, + # "_primary_term" => 1 + # } + # + # For information on the contents of this Hash please see: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-response-body + def index(data, type: DEFAULT_DOC_TYPE) + super.first + end end end end diff --git a/lib/jay_api/elasticsearch/indexable.rb b/lib/jay_api/elasticsearch/indexable.rb index c5c82cb..02f4c57 100644 --- a/lib/jay_api/elasticsearch/indexable.rb +++ b/lib/jay_api/elasticsearch/indexable.rb @@ -29,17 +29,17 @@ module Indexable # :reek:ControlParameter (want to avoid the creating of the logger on method definition) # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client object. - # @param [String] index_name The name of the Elasticsearch index. + # @param [Array] index_names The names of the Elasticsearch indexes. # @param [Integer] batch_size The size of the batch. When this many items # are pushed into the index they are flushed to the Elasticsearch # instance. # @param [Logging::Logger, nil] logger The logger object to use, if # none is given a new one will be created. - def initialize(client:, index_name:, batch_size: 100, logger: nil) + def initialize(client:, index_names:, batch_size: 100, logger: nil) @logger = logger || Logging.logger[self] @client = client - @index_name = index_name + @index_names = index_names @batch_size = batch_size @batch = [] @@ -49,7 +49,10 @@ def initialize(client:, index_name:, batch_size: 100, logger: nil) # Elasticsearch instance, only puts it into the send queue). # @param [Hash] data The data to be pushed to the index. def push(data) - batch << { index: { _index: index_name, _type: 'nested', data: data } } + index_names.each do |index_name| + batch << { index: { _index: index_name, _type: 'nested', data: data } } + end + flush! if batch.size >= batch_size end @@ -58,8 +61,8 @@ def push(data) # @param [String, nil] type The type of the document. When set to +nil+ # the decision is left to Elasticsearch's API. Which will normally # default to +_doc+. - # @return [Hash] A hash with information about the created document. An - # example of such Hash is: + # @return [Array] An array with hashes containing information about + # the created documents. An example of such Hashes is: # # { # "_index" => "xyz01_unit_test", @@ -77,7 +80,7 @@ def push(data) def index(data, type: DEFAULT_DOC_TYPE) raise ArgumentError, "Unsupported type: '#{type}'" unless SUPPORTED_TYPES.include?(type) - client.index index: index_name, type: type, body: data + index_names.map { |index_name| client.index index: index_name, type: type, body: data } end # Performs a query on the index. @@ -92,7 +95,7 @@ def index(data, type: DEFAULT_DOC_TYPE) # query fails. def search(query, batch_counter: nil, type: nil) begin - response = Response.new(client.search(index: index_name, body: query)) + response = Response.new(client.search(index: index_names, body: query)) rescue ::Elasticsearch::Transport::Transport::Errors::BadRequest logger.error "The 'search' query is invalid: #{JSON.pretty_generate(query)}" raise @@ -148,7 +151,7 @@ def queue_size # @raise [Elasticsearch::Transport::Transport::ServerError] If the # query fails. def delete_by_query(query, slices: nil, wait_for_completion: true) - request_params = { index: index_name, body: query }.tap do |params| + request_params = { index: index_names, body: query }.tap do |params| params.merge!(slices: slices) if slices params.merge!(wait_for_completion: false) unless wait_for_completion end @@ -174,13 +177,7 @@ def delete_by_query_async(query, slices: nil) private - attr_reader :logger, :batch - - # @raise [NotImplementedError] Is always raised, class including the - # module **must** implement this method. - def index_name - raise NotImplementedError, "Please implement the method #{__method__} in #{self.class}" - end + attr_reader :index_names, :logger, :batch # Scans the Elasticsearch response in search for the first item that has # an erroneous state and raises an error including the error details. diff --git a/spec/jay_api/elasticsearch/index_spec.rb b/spec/jay_api/elasticsearch/index_spec.rb index 9fc6f88..b9d9cea 100644 --- a/spec/jay_api/elasticsearch/index_spec.rb +++ b/spec/jay_api/elasticsearch/index_spec.rb @@ -155,7 +155,7 @@ let(:expected_query) do { - index: index_name, + index: [index_name], body: query } end @@ -185,6 +185,8 @@ } end + let(:index_names) { [index_name] } + it_behaves_like 'Indexable#delete_by_query' end diff --git a/spec/jay_api/elasticsearch/indexable_shared.rb b/spec/jay_api/elasticsearch/indexable_shared.rb index bae8e77..aeaeefd 100644 --- a/spec/jay_api/elasticsearch/indexable_shared.rb +++ b/spec/jay_api/elasticsearch/indexable_shared.rb @@ -369,7 +369,7 @@ it 'relays the command to the Elasticsearch client' do expect(client).to receive(:delete_by_query).with( - index: index_name, body: query + index: index_names, body: query ) method_call @@ -380,7 +380,7 @@ it 'relays the command to the Elasticsearch client' do expect(client).to receive(:delete_by_query).with( - index: index_name, body: query, slices: 5 + index: index_names, body: query, slices: 5 ) method_call @@ -392,7 +392,7 @@ it 'relays the command to the Elasticsearch client' do expect(client).to receive(:delete_by_query).with( - index: index_name, body: query, wait_for_completion: false + index: index_names, body: query, wait_for_completion: false ) method_call @@ -404,7 +404,7 @@ it 'relays the command to the Elasticsearch client' do expect(client).to receive(:delete_by_query).with( - index: index_name, body: query, slices: 5, wait_for_completion: false + index: index_names, body: query, slices: 5, wait_for_completion: false ) method_call From c70b83f2004a447ff2325a33aaf38ac2f5e6e0a2 Mon Sep 17 00:00:00 2001 From: Sergio Bobillier Date: Thu, 7 Aug 2025 17:36:09 +0200 Subject: [PATCH 4/6] [JAY-676] Add the Elasticsearch::Indexes class The class represents a set of Elasticsearch indexes. Through the class it is possible to query multiple indexes or push data to them at the same time. The class doesn't really add any logic. All the functional logic is just taken from the Indexable mixin. Because of the way the #push method works. When the batch_size is not a multiple of the number of indexes it is possible for the pushed batch to be slightly bigger than batch_size. Hence a Warning is logged when this condition is detected by the Indexes class's constructor. --- CHANGELOG.md | 4 + lib/jay_api/elasticsearch.rb | 1 + lib/jay_api/elasticsearch/indexes.rb | 34 ++ spec/jay_api/elasticsearch/index_spec.rb | 129 +++++- .../jay_api/elasticsearch/indexable_shared.rb | 136 +----- spec/jay_api/elasticsearch/indexes_spec.rb | 433 ++++++++++++++++++ 6 files changed, 603 insertions(+), 134 deletions(-) create mode 100644 lib/jay_api/elasticsearch/indexes.rb create mode 100644 spec/jay_api/elasticsearch/indexes_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 05df220..971569a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ Please mark backwards incompatible changes with an exclamation mark at the start ## [Unreleased] +### Added +- The `Elasticsearch::Indexes` class. A class which allows multiple indexes to + be used (fed or queried) at the same time. + ## [28.3.0] - 2025-06-05 ### Added diff --git a/lib/jay_api/elasticsearch.rb b/lib/jay_api/elasticsearch.rb index 6d1dc8c..f4d233e 100644 --- a/lib/jay_api/elasticsearch.rb +++ b/lib/jay_api/elasticsearch.rb @@ -6,6 +6,7 @@ require_relative 'elasticsearch/client_factory' require_relative 'elasticsearch/errors' require_relative 'elasticsearch/index' +require_relative 'elasticsearch/indexes' require_relative 'elasticsearch/query_builder' require_relative 'elasticsearch/query_results' require_relative 'elasticsearch/response' diff --git a/lib/jay_api/elasticsearch/indexes.rb b/lib/jay_api/elasticsearch/indexes.rb new file mode 100644 index 0000000..6981f70 --- /dev/null +++ b/lib/jay_api/elasticsearch/indexes.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require_relative 'indexable' + +module JayAPI + module Elasticsearch + # Represents a group of Elasticsearch indexes. Allows the execution of + # searches over all of the specified indexes or push data to all of them + # at the same time. + class Indexes + include ::JayAPI::Elasticsearch::Indexable + + # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client object. + # @param [Array] index_names The names of the Elasticsearch indexes. + # @param [Integer] batch_size The size of the batch. When this many items + # are pushed into the indexes they are flushed to the Elasticsearch + # instance. + # @param [Logging::Logger, nil] logger The logger object to use, if + # none is given a new one will be created. + def initialize(client:, index_names:, batch_size: 100, logger: nil) + super + + return if (batch_size % index_names.size).zero? + + self.logger.warn( + "'batch_size' is not a multiple of the number of elements in 'index_names'. " \ + "This can lead to a _bulk size slightly bigger than 'batch_size'" + ) + end + + attr_reader :index_names + end + end +end diff --git a/spec/jay_api/elasticsearch/index_spec.rb b/spec/jay_api/elasticsearch/index_spec.rb index b9d9cea..dc92cf3 100644 --- a/spec/jay_api/elasticsearch/index_spec.rb +++ b/spec/jay_api/elasticsearch/index_spec.rb @@ -52,7 +52,44 @@ # Needed in order to be able to repeat the method call let(:described_method) { -> { index.push(data) } } - it_behaves_like 'Indexable#push' + let(:constructor_params) do + super().merge(batch_size: 10) + end + + let(:expected_data) do + { + body: [ + { + index: { + _index: index_name, + _type: 'nested', + data: data + } + } + ] * 10 + } + end + + context 'when the amount of data is smaller than the batch size' do + it 'enqueues the data but does not push it to Elasticsearch' do + expect(client).not_to receive(:bulk) + 5.times { described_method.call } + end + end + + context 'when the amount of data matches the batch size' do + it 'puts the data with the correct structure in the queue' do + expect(client).to receive(:bulk).with(expected_data) + 10.times { described_method.call } + end + end + + context 'when the amount of data goes over the batch size' do + it 'pushes the data to Elasticsearch every time the batch size is hit' do + expect(client).to receive(:bulk).with(expected_data).exactly(3).times + 30.times { described_method.call } + end + end end describe '#index' do @@ -73,15 +110,105 @@ } end + before do + allow(client).to receive(:index).and_return(successful_response) + end + + shared_examples_for '#index when no type is specified' do + let(:expected_data) do + { + index: index_name, + type: 'nested', + body: data + } + end + + it 'sends the given data to Elasticsearch right away (with the "nested" type)' do + expect(client).to receive(:index).with(expected_data) + method_call + end + + it 'returns the expected Hash' do + expect(method_call).to eq(successful_response) + end + end + + context 'when no type is specified' do + it_behaves_like '#index when no type is specified' + end + + context 'when type is specified as "nested"' do + let(:method_params) { { type: 'nested' } } + + it_behaves_like '#index when no type is specified' + end + + context 'when type is set to nil' do + let(:method_params) { { type: nil } } + + let(:expected_data) do + { + index: index_name, + type: nil, + body: data + } + end + + it "sends the given data to Elasticsearch right away (with 'type' set to nil)" do + expect(client).to receive(:index).with(expected_data) + method_call + end + + it 'returns the expected Hash' do + expect(method_call).to eq(successful_response) + end + end + it_behaves_like 'Indexable#index' end describe '#queue_size' do subject(:method_call) { index.queue_size } + let(:constructor_params) do + super().merge(batch_size: 15) + end + let(:indexable) { index } it_behaves_like 'Indexable#queue_size' + + context 'when a single item is pushed to the index' do + it 'increases accordingly' do + expect { indexable.push(data) }.to change(indexable, :queue_size).by(1) + end + end + + context "when less than 'batch_size' items are pushed to the queue" do + before do + 10.times { indexable.push(data) } + end + + it 'returns the correct number of items' do + expect(method_call).to eq(10) + end + end + + context "when 'batch_size' items are pushed to the index" do + it 'goes back down to zero' do + expect(client).to receive(:bulk).once # Checks that the items are pushed when the queue gets full. + 15.times { indexable.push(data) } + expect(method_call).to be(0) + end + end + + context "when more than 'batch_size' items are pushed to the index" do + it 'returns the expected number of items' do + expect(client).to receive(:bulk).once # Checks that the items are pushed when the queue gets full. + 20.times { indexable.push(data) } + expect(method_call).to be(5) + end + end end describe '#flush' do diff --git a/spec/jay_api/elasticsearch/indexable_shared.rb b/spec/jay_api/elasticsearch/indexable_shared.rb index aeaeefd..12772e5 100644 --- a/spec/jay_api/elasticsearch/indexable_shared.rb +++ b/spec/jay_api/elasticsearch/indexable_shared.rb @@ -16,7 +16,8 @@ double( Logging::Logger, info: true, - error: true + error: true, + warn: true ) end # rubocop:enable RSpec/VerifiedDoubles @@ -46,104 +47,9 @@ end end -RSpec.shared_examples_for 'Indexable#push' do - let(:constructor_params) do - super().merge(batch_size: 10) - end - - let(:expected_data) do - { - body: [ - { - index: { - _index: index_name, - _type: 'nested', - data: data - } - } - ] * 10 - } - end - - context 'when the amount of data is smaller than the batch size' do - it 'enqueues the data but does not push it to Elasticsearch' do - expect(client).not_to receive(:bulk) - 5.times { described_method.call } - end - end - - context 'when the amount of data matches the batch size' do - it 'puts the data with the correct structure in the queue' do - expect(client).to receive(:bulk).with(expected_data) - 10.times { described_method.call } - end - end - - context 'when the amount of data goes over the batch size' do - it 'pushes the data to Elasticsearch every time the batch size is hit' do - expect(client).to receive(:bulk).with(expected_data).exactly(3).times - 30.times { described_method.call } - end - end -end - RSpec.shared_examples_for 'Indexable#index' do - before do - allow(client).to receive(:index).and_return(successful_response) - end - - shared_examples_for '#index when no type is specified' do - let(:expected_data) do - { - index: index_name, - type: 'nested', - body: data - } - end - - it 'sends the given data to Elasticsearch right away (with the "nested" type)' do - expect(client).to receive(:index).with(expected_data) - method_call - end - - it 'returns the expected Hash' do - expect(method_call).to eq(successful_response) - end - end - - context 'when no type is specified' do - it_behaves_like '#index when no type is specified' - end - - context 'when type is specified as "nested"' do - let(:method_params) { { type: 'nested' } } - - it_behaves_like '#index when no type is specified' - end - - context 'when type is set to nil' do - let(:method_params) { { type: nil } } - - let(:expected_data) do - { - index: index_name, - type: nil, - body: data - } - end - - it "sends the given data to Elasticsearch right away (with 'type' set to nil)" do - expect(client).to receive(:index).with(expected_data) - method_call - end - - it 'returns the expected Hash' do - expect(method_call).to eq(successful_response) - end - end - context 'when type is set to an invalid value' do - let(:method_params) { { type: 'flatten' } } + let(:method_params) { { type: 'flatten' } } it 'raises an ArgumentError' do expect { method_call }.to raise_error(ArgumentError, "Unsupported type: 'flatten'") @@ -152,47 +58,11 @@ end RSpec.shared_examples_for 'Indexable#queue_size' do - let(:constructor_params) do - super().merge(batch_size: 15) - end - context 'with no items have been pushed to the queue' do it 'returns 0' do expect(method_call).to eq(0) end end - - context 'when a single item is pushed to the index' do - it 'increases accordingly' do - expect { indexable.push(data) }.to change(indexable, :queue_size).by(1) - end - end - - context "when less than 'batch_size' items are pushed to the queue" do - before do - 10.times { indexable.push(data) } - end - - it 'returns the correct number of items' do - expect(method_call).to eq(10) - end - end - - context "when 'batch_size' items are pushed to the index" do - it 'goes back down to zero' do - expect(client).to receive(:bulk).once # Checks that the items are pushed when the queue gets full. - 15.times { indexable.push(data) } - expect(method_call).to be(0) - end - end - - context "when more than 'batch_size' items are pushed to the index" do - it 'returns the expected number of items' do - expect(client).to receive(:bulk).once # Checks that the items are pushed when the queue gets full. - 20.times { indexable.push(data) } - expect(method_call).to be(5) - end - end end RSpec.shared_examples_for 'Indexable#flush' do diff --git a/spec/jay_api/elasticsearch/indexes_spec.rb b/spec/jay_api/elasticsearch/indexes_spec.rb new file mode 100644 index 0000000..6b25480 --- /dev/null +++ b/spec/jay_api/elasticsearch/indexes_spec.rb @@ -0,0 +1,433 @@ +# frozen_string_literal: true + +require 'jay_api/elasticsearch/indexes' + +require_relative 'indexable_shared' + +RSpec.describe JayAPI::Elasticsearch::Indexes do + subject(:indexes) { described_class.new(**constructor_params) } + + let(:constructor_params) do + { + index_names: index_names, + client: client + } + end + + let(:index_names) { %w[xyz01_integration_test xyz01_unit_test xyz01_traceability] } + + let(:successful_response) do + { + 'took' => 152, + 'errors' => false, + 'items' => [ + { + 'index' => { + '_index' => 'xyz01_integration_test', + '_type' => 'nested', + '_id' => 'gHKq628BVvaJMVLXnxsb', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 1, + '_primary_term' => 1, + 'status' => 201 + } + }, + { + 'index' => { + '_index' => 'xyz01_unit_test', + '_type' => 'nested', + '_id' => 'gHKq628BVvaJMVLXnxsb', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 1, + '_primary_term' => 1, + 'status' => 201 + } + }, + { + 'index' => { + '_index' => 'xyz01_traceability', + '_type' => 'nested', + '_id' => 'gHKq628BVvaJMVLXnxsb', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 1, + '_primary_term' => 1, + 'status' => 201 + } + } + ] + } + end + + let(:data) { { type: 'example', name: 'Test data example' } } + + include_context 'with mocked objects for Elasticsearch::Indexable' + + it 'does not respond to #index_name' do + expect(indexes).not_to respond_to(:index_name) + end + + describe '#initialize' do + subject(:method_call) { indexes } + + shared_examples_for '#initialize when the batch_size is not a multiple of the number of indexes' do + it 'logs a warning telling the user that the batch size might be overshot' do + expect(mocked_logger).to receive(:warn).with( + "'batch_size' is not a multiple of the number of elements in 'index_names'. " \ + "This can lead to a _bulk size slightly bigger than 'batch_size'" + ) + + method_call + end + end + + context 'when the batch size is not specified' do + it_behaves_like 'Indexable#initialize' + + it_behaves_like '#initialize when the batch_size is not a multiple of the number of indexes' + end + + context 'when the batch size is not a multiple of the number of indexes' do + let(:constructor_params) { super().merge(batch_size: 10) } + + it_behaves_like 'Indexable#initialize' + + it_behaves_like '#initialize when the batch_size is not a multiple of the number of indexes' + end + + context 'when the batch size is a multiple of the number of indexes' do + let(:constructor_params) { super().merge(batch_size: 15) } + + it 'does not log any warnings' do + expect(mocked_logger).not_to receive(:warn) + method_call + end + + it_behaves_like 'Indexable#initialize' + end + end + + describe '#index_names' do + let(:method_call) { indexes.index_names } + + it 'returns the expected value (the index names passed to the class constructor)' do + expect(method_call).to eq(%w[xyz01_integration_test xyz01_unit_test xyz01_traceability]) + end + end + + describe '#push' do + subject(:method_call) { described_method.call } + + # Needed in order to be able to repeat the method call + let(:described_method) { -> { indexes.push(data) } } + + let(:constructor_params) do + super().merge(batch_size: 15) + end + + let(:expected_data) do + { + body: index_names.cycle(5).map do |index_name| + { + index: { + _index: index_name, + _type: 'nested', + data: data + } + } + end + } + end + + context 'when the amount of data is smaller than the batch size' do + it 'enqueues the data but does not push it to Elasticsearch' do + expect(client).not_to receive(:bulk) + 3.times { described_method.call } + end + end + + context 'when the amount of data matches the batch size' do + it 'puts the data with the correct structure in the queue' do + expect(client).to receive(:bulk).with(expected_data) + 5.times { described_method.call } + end + + it 'leaves the queue empty' do + 5.times { described_method.call } + expect(indexes.queue_size).to be_zero + end + end + + context 'when the amount of data goes over the batch size' do + it 'pushes the data to Elasticsearch every time the batch size is hit' do + expect(client).to receive(:bulk).with(expected_data).exactly(3).times + 15.times { described_method.call } + end + end + end + + describe '#index' do + subject(:method_call) { indexes.index(data, **method_params) } + + let(:method_params) { {} } + + let(:responses) do + index_names.map do |index_name| + { + '_index' => index_name, + '_type' => 'nested', + '_id' => 'SVY1mJEBQ5CNFZM8Lodt', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 0, + '_primary_term' => 1 + } + end + end + + let(:successful_response) do + [ + { + '_index' => 'xyz01_integration_test', + '_type' => 'nested', + '_id' => 'SVY1mJEBQ5CNFZM8Lodt', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 0, + '_primary_term' => 1 + }, + { + '_index' => 'xyz01_unit_test', + '_type' => 'nested', + '_id' => 'SVY1mJEBQ5CNFZM8Lodt', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 0, + '_primary_term' => 1 + }, + { + '_index' => 'xyz01_traceability', + '_type' => 'nested', + '_id' => 'SVY1mJEBQ5CNFZM8Lodt', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 0, + '_primary_term' => 1 + } + ] + end + + before do + allow(client).to receive(:index).and_return(*responses) + end + + shared_examples_for '#index when no type is specified' do + let(:expected_data) do + { + type: 'nested', + body: data + } + end + + it 'sends the given data to the first Elasticsearch index (with the "nested" type)' do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_unit_test')) + method_call + end + + it 'sends the given data to the second Elasticsearch index (with the "nested" type)' do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_integration_test')) + method_call + end + + it 'sends the given data to the third Elasticsearch index (with the "nested" type)' do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_traceability')) + method_call + end + + it 'returns the expected Array of hashes' do + expect(method_call).to eq(successful_response) + end + end + + context 'when no type is specified' do + it_behaves_like '#index when no type is specified' + end + + context 'when type is specified as "nested"' do + let(:method_params) { { type: 'nested' } } + + it_behaves_like '#index when no type is specified' + end + + context 'when type is set to nil' do + let(:method_params) { { type: nil } } + + let(:expected_data) do + { + type: nil, + body: data + } + end + + it "sends the given data to the first Elasticsearch index (with 'type' set to nil)" do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_unit_test')) + method_call + end + + it "sends the given data to the second Elasticsearch index (with 'type' set to nil)" do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_integration_test')) + method_call + end + + it "sends the given data to the third Elasticsearch index (with 'type' set to nil)" do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_traceability')) + method_call + end + + it 'returns the expected Array of hashes' do + expect(method_call).to eq(successful_response) + end + end + + it_behaves_like 'Indexable#index' + end + + describe '#queue_size' do + subject(:method_call) { indexes.queue_size } + + let(:indexable) { indexes } + + it_behaves_like 'Indexable#queue_size' + end + + describe '#flush' do + subject(:method_call) { indexes.flush } + + let(:indexable) { indexes } + + let(:error_response) do + { + 'took' => 9, + 'errors' => true, + 'items' => [ + { + 'index' => { + '_index' => 'xyz01_integration_test', + '_type' => 'nested', + '_id' => 'EExOS4cBrb-mrkbmwAGg', + 'result' => 'created', + '_shards' => { + 'total' => 2, + 'successful' => 1, + 'failed' => 0 + }, + 'status' => 201 + } + }, + { + 'index' => { + '_index' => 'xyz01_unit_test', + '_type' => 'nested', + '_id' => 'fnKi628BVvaJMVLXJxty', + 'status' => 400, + 'error' => { + 'type' => 'illegal_argument_exception', + 'reason' => 'mapper [test_env.report_meta.SUT tags.cluster] ' \ + 'of different type, current_type [long], merged_type [text]' + } + } + } + ] + } + end + + it_behaves_like 'Indexable#flush' + end + + describe '#search' do + subject(:method_call) { indexes.search(query, **method_params) } + + let(:query) do + { + query: { + query_string: { + fields: ['test_case.identifier'], + query: '"Elite::Tools::Jay::ElasticsearchIndex/#push/Puts the data with the correct structure in the queue"' + } + }, + sort: [ + { + 'test_case.finished_at.keyword': { + order: 'desc' + } + } + ] + } + end + + let(:method_params) { { batch_counter: batch_counter } } + + let(:indexable) { indexes } + + let(:expected_query) do + { + index: index_names, + body: query + } + end + + it_behaves_like 'Indexable#search' + end + + describe '#delete_by_query' do + subject(:method_call) { indexes.delete_by_query(query, **method_params) } + + let(:method_params) { {} } + + let(:query) do + { + bool: { + must: [ + query_string: { + query: '"test_case.id_long: "Qualification Tests/Diagnostics/OBD/*"' + }, + range: { + field: 'test_case.finished_at', + gte: '2024/02/07 18:00:00', + lte: '2024/02/09 13:00:00' + } + ] + } + } + end + + it_behaves_like 'Indexable#delete_by_query' + end + + describe '#delete_by_query_async' do + subject(:method_call) { indexes.delete_by_query_async(query, slices: 5) } + + let(:query) do + { + bool: { + must: [ + range: { field: 'test_case.finished_at', gte: '2024/02/07 18:00:00', lte: '2024/02/09 13:00:00' } + ] + } + } + end + + let(:indexable) { indexes } + + it_behaves_like 'Indexable#delete_by_query_async' + end +end From 0c60955ce66b796afc79efb84eaffa998e7e9f45 Mon Sep 17 00:00:00 2001 From: Sergio Bobillier Date: Mon, 18 Aug 2025 14:55:52 +0200 Subject: [PATCH 5/6] [JAY-676] Change Index to Indexable in documentation Changes the documentation for some methods to make it clear that they can actually take any class which includes the Indexable mixin as opposed to requiring explicitly an Elasticsearch::Index object. --- lib/jay_api/elasticsearch/async.rb | 4 ++-- lib/jay_api/elasticsearch/query_results.rb | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/jay_api/elasticsearch/async.rb b/lib/jay_api/elasticsearch/async.rb index 0c2de8f..07ae3a9 100644 --- a/lib/jay_api/elasticsearch/async.rb +++ b/lib/jay_api/elasticsearch/async.rb @@ -19,8 +19,8 @@ class Async def_delegators :index, :index_name - # @param [JayAPI::Elasticsearch::Index] index The elasticsearch index on - # which to execute asynchronous operations + # @param [JayAPI::Elasticsearch::Indexable] index The elasticsearch + # index or indexes on which to execute asynchronous operations def initialize(index) @index = index end diff --git a/lib/jay_api/elasticsearch/query_results.rb b/lib/jay_api/elasticsearch/query_results.rb index 29009d9..2f4c81c 100644 --- a/lib/jay_api/elasticsearch/query_results.rb +++ b/lib/jay_api/elasticsearch/query_results.rb @@ -21,8 +21,8 @@ class QueryResults def_delegators :response, :hits, :total, :size, :count, :first, :last, :any?, :empty?, :aggregations # Creates a new instance of the class. - # @param [JayAPI::Elasticsearch::Index] index The Elasticsearch - # index used to perform the query. + # @param [JayAPI::Elasticsearch::Indexable] index The Elasticsearch + # index or indexes over which the query should be performed. # @param [Hash] query The query that produced the results. # @param [JayAPI::Elasticsearch::Results] response An object containing Docs retrieved from Elasticsearch. # @param [JayAPI::Elasticsearch::BatchCounter] batch_counter An object keeping track of the current batch. From 6e68bce351af6d047c1681cffeb84f9b336c450a Mon Sep 17 00:00:00 2001 From: Sergio Bobillier Date: Mon, 18 Aug 2025 15:31:16 +0200 Subject: [PATCH 6/6] [JAY-676] Add Indexable's documentation Adds documentation about the Indexable mixin and the classes that include it. Just like in previous commits the common elements are being moved to a shared place, in this case, the documentation for Index now resides in the documentation page for Indexable. --- .../{index.rst => indexable.rst} | 88 +++++-------------- .../elasticsearch/indexable/index.rst | 55 ++++++++++++ .../elasticsearch/indexable/indexes.rst | 67 ++++++++++++++ .../elasticsearch/query_builder.rst | 4 +- 4 files changed, 148 insertions(+), 66 deletions(-) rename documentation/source/user_guidelines/elasticsearch/{index.rst => indexable.rst} (55%) create mode 100644 documentation/source/user_guidelines/elasticsearch/indexable/index.rst create mode 100644 documentation/source/user_guidelines/elasticsearch/indexable/indexes.rst diff --git a/documentation/source/user_guidelines/elasticsearch/index.rst b/documentation/source/user_guidelines/elasticsearch/indexable.rst similarity index 55% rename from documentation/source/user_guidelines/elasticsearch/index.rst rename to documentation/source/user_guidelines/elasticsearch/indexable.rst index 2c85e4b..23ad555 100644 --- a/documentation/source/user_guidelines/elasticsearch/index.rst +++ b/documentation/source/user_guidelines/elasticsearch/indexable.rst @@ -1,62 +1,22 @@ -Index -===== +Indexable +========= -This class represents an Index inside an Elasticsearch cluster. It provides a -set of methods that allow the user to query the index and add new data. +The following methods are common to the following classes, which include the +``Indexable`` mixin: -The class also keeps a buffer of documents waiting to be pushed to the index, -the user can add documents to the buffer and the class will push them as soon as -the buffer is full. The user can also force the push of the records by flushing -the buffer. +.. toctree:: + :maxdepth: 2 + :glob: -To initialize an index: - -.. code-block:: ruby - - client = JayAPI::Elasticsearch::ClientFactory.new( - cluster_url: 'https://my-cluster.elastic.io' - ).create(max_attempts: 3, wait_strategy: :constant, wait_interval: 2) - - index = JayAPI::Elasticsearch::Index.new( - client: client, - index_name: 'my_index' - ) - -The ``cluster_url`` and the ``index_name`` are the only required parameters. If -the cluster is configured to use Elasticsearch's default port (``9200``) and has -no authentication in place this is all you need. However in most cases that -would not be enough, so you can also provide the following extra parameters: - -* ``port``: The port number where the Elasticsearch cluster is listening for - connections. -* ``username``: The username to use when authentication against the cluster. -* ``password``: The user's password -* ``batch_size``: The amount of documents the ``Index`` will store in its buffer - before triggering an automatic flush. -* ``logger``: If you want the messages to be logged to a particular logger. If - you don't pass a logger then the class will create one. - -The ``create`` method, that returns the client object, also takes optional arguments, -which define connection re-try behaviour: - -* ``max_attempts``: Sets the maximum number of reconnection attempts in - response to server errors. -* ``wait_strategy``: Determines the strategy for wait intervals between - reconnection attempts. Options are: - - * ``:constant`` - Maintains a consistent wait time specified by ``wait_time``. - * ``:geometric`` - Increases the wait time geometrically based on ``wait_time``. - -* ``wait_time``: Specifies the base wait time (in seconds) for the chosen - ``wait_strategy``. + indexable/* #push ----- -The ``push`` method stores a document in the ``Index``'s buffer. If the buffer +The ``push`` method stores a document in the ``Indexable``'s buffer. If the buffer reaches the maximum number of records the buffer will be flushed automatically. -``push`` takes a single ``Hash``, the document you want to send to the index. +``push`` takes a single ``Hash``, the document you want to send to the index(es). .. warning:: @@ -70,10 +30,10 @@ Example: documents.each do |document| # do something with your document, then push it - index.push(document) + indexable.push(document) end - index.flush # Do not forget to flush the index at the end. + indexable.flush # Do not forget to flush the indexable at the end. #index ------ @@ -81,27 +41,27 @@ Example: ``index`` pushes a document directly to the Elasticsearch cluster without adding it to the buffer first. So you don't need to call ``flush``: -``index`` takes a single ``Hash``, the document you want to send to the index. +``index`` takes a single ``Hash``, the document you want to send to the index(es). Example: .. code-block:: ruby - index.index(my_document) + indexable.index(my_document) .. note:: - Pushing documents one at a time is very inefficient because the ``Index`` + Pushing documents one at a time is very inefficient because the ``Indexable`` needs to perform an HTTP Request for each one. If you want to send many documents use ``push`` instead. -.. _`Index#search`: +.. _`Indexable#search`: #search ------- -The ``search`` method allows you to search the Elasticsearch index for documents -matching the provided query. This method takes two arguments: +The ``search`` method allows you to search the Elasticsearch index(es) for +documents matching the provided query. This method takes two arguments: * ``query`` A ``Hash`` with the query you want to execute, this Hash will be converted to JSON before being sent to Elasticsearch. It must follow @@ -121,7 +81,7 @@ Example: .. code-block:: ruby - index.search( + indexable.search( query: { match_all: { } }, @@ -144,10 +104,10 @@ Example: .. code-block:: ruby documents.each do |document| - index.push(document) + indexable.push(document) end - index.flush + indexable.flush #queue_size ----------- @@ -159,17 +119,17 @@ Example .. code-block:: ruby - index.queue_size # => 16 + indexable.queue_size # => 16 #delete_by_query ---------------- This method allows you to remove the documents that match the given query from -the index. The method has a single parameter: +the index(es). The method has a single parameter: * ``query``: A ``Hash`` with the query you want to use to match documents for deletion. For more information on this parameter or how to create queries see - the :ref:`Index#search` method documentation. + the :ref:`Indexable#search` method documentation. On success the method will return a ``Hash`` with information about the executed command, for example: diff --git a/documentation/source/user_guidelines/elasticsearch/indexable/index.rst b/documentation/source/user_guidelines/elasticsearch/indexable/index.rst new file mode 100644 index 0000000..75eee72 --- /dev/null +++ b/documentation/source/user_guidelines/elasticsearch/indexable/index.rst @@ -0,0 +1,55 @@ +Index +===== + +.. note:: + + This class includes the :doc:`../indexable` mixin. It exposes all its methods. + +This class represents an Index inside an Elasticsearch cluster. It provides a +set of methods that allow the user to query the index and add new data. + +The class also keeps a buffer of documents waiting to be pushed to the index, +the user can add documents to the buffer and the class will push them as soon as +the buffer is full. The user can also force the push of the records by flushing +the buffer. + +To initialize an index: + +.. code-block:: ruby + + client = JayAPI::Elasticsearch::ClientFactory.new( + cluster_url: 'https://my-cluster.elastic.io' + ).create(max_attempts: 3, wait_strategy: :constant, wait_interval: 2) + + index = JayAPI::Elasticsearch::Index.new( + client: client, + index_name: 'my_index' + ) + +The ``cluster_url`` and the ``index_name`` are the only required parameters. If +the cluster is configured to use Elasticsearch's default port (``9200``) and has +no authentication in place this is all you need. However in most cases that +would not be enough, so you can also provide the following extra parameters: + +* ``port``: The port number where the Elasticsearch cluster is listening for + connections. +* ``username``: The username to use when authentication against the cluster. +* ``password``: The user's password +* ``batch_size``: The amount of documents the ``Index`` will store in its buffer + before triggering an automatic flush. +* ``logger``: If you want the messages to be logged to a particular logger. If + you don't pass a logger then the class will create one. + +The ``create`` method, that returns the client object, also takes optional arguments, +which define connection re-try behaviour: + +* ``max_attempts``: Sets the maximum number of reconnection attempts in + response to server errors. +* ``wait_strategy``: Determines the strategy for wait intervals between + reconnection attempts. Options are: + + * ``:constant`` - Maintains a consistent wait time specified by ``wait_time``. + * ``:geometric`` - Increases the wait time geometrically based on ``wait_time``. + +* ``wait_time``: Specifies the base wait time (in seconds) for the chosen + ``wait_strategy``. diff --git a/documentation/source/user_guidelines/elasticsearch/indexable/indexes.rst b/documentation/source/user_guidelines/elasticsearch/indexable/indexes.rst new file mode 100644 index 0000000..26f82b6 --- /dev/null +++ b/documentation/source/user_guidelines/elasticsearch/indexable/indexes.rst @@ -0,0 +1,67 @@ +Indexes +======= + +.. note:: + + This class includes the :doc:`../indexable` mixin. It exposes all its methods. + +This class represents a set of indexes in an elasticsearch cluster. It provides +a set of methods that allow the user to query the indexes or add new data to +all of them at the same time. + +The class works exactly as :doc:`index`. It only differs in the fact that it can +be initialized with multiple ``index_names`` and not only one, like ``Index``. + +Initializing +------------ + +Just like with ``Index`` you need an instance of ``Elasticsearch::Client``. You +can use the ``ClientFactory`` to get one: + +.. code-block:: ruby + + require 'jay_api/elasticsearch/client_factory' + + client = JayAPI::Elasticsearch::ClientFactory.new( + cluster_url: 'https://my-cluster.elastic.io' + ).create + +Then you can use the client to initialize the ``Indexes`` class: + +.. code-block:: ruby + + require 'jay_api/elasticsearch/indexes' + + indexes = JayAPI::Elasticsearch::Indexes.new( + client: client, index_names: %w[my_index my_other_index not_my_index] + ) + +The following arguments are available for the ``#initialize`` method: + +* ``client``: An instance of ``Elasticsearch::Client``. You can get one using + the ``Elasticsearch::ClientFactory`` class. +* ``index_names``: An ``Array`` of ``String``. The names of the indexes you + want to work with. +* ``batch_size``: The number of documents the ``Indexes`` class will store in + its buffer before triggering a ``#flush`` call when the ``#push`` method is + used to add data. The default is: 100. +* ``logger``: A ``Logger`` object used to log messages. If none is given the + ``Indexes`` object will create one of its own. + +.. warning:: + + When the ``batch_size`` isn't a multiple of the number of elements in the + ``index_names`` array there is a chance that the size of the batches sent to + the Elasticsearch could be bigger than ``batch_size``. This can be avoided + simply by choosing an integer multiple of the array's size. + +#index_names +------------ + +This method returns the array of index names used to initialize the ``Indexes`` +object. + +.. warning:: + + Unlike ``Index``, ``Indexes`` objects **DO NOT** respond to the + ``#index_name`` message. diff --git a/documentation/source/user_guidelines/elasticsearch/query_builder.rst b/documentation/source/user_guidelines/elasticsearch/query_builder.rst index def157f..2bdd0b8 100644 --- a/documentation/source/user_guidelines/elasticsearch/query_builder.rst +++ b/documentation/source/user_guidelines/elasticsearch/query_builder.rst @@ -76,7 +76,7 @@ be returned if there aren't enough documents matching the query). By using ``from`` and ``size`` you can only scroll through a maximum of 10,000 documents. If you have more than that in your index, you'll have to use -:ref:`Index#search` method with ``type: :search_after``. +:ref:`Indexable#search` method with ``type: :search_after``. #sort ----- @@ -171,7 +171,7 @@ And the use of Hashes to include or exclude parts of the document, for example: Once you have added all the clauses you want on your queries you can call ``to_h`` or ``to_query`` to get the corresponding Hash. The class converts the -query to a Hash representation that can then be passed to :ref:`Index#search` to +query to a Hash representation that can then be passed to :ref:`Indexable#search` to perform the actual search. .. note::