diff --git a/lib/logstash/inputs/salesforce.rb b/lib/logstash/inputs/salesforce.rb index b690af0..3b9d2a4 100755 --- a/lib/logstash/inputs/salesforce.rb +++ b/lib/logstash/inputs/salesforce.rb @@ -33,7 +33,7 @@ # username => 'email@example.com' # password => 'super-secret' # security_token => 'SECURITY TOKEN FOR THIS USER' -# sfdc_object_name => 'Opportunity' +# sfdc_object_names => ['Account','User'] # } # } # @@ -52,68 +52,117 @@ class LogStash::Inputs::Salesforce < LogStash::Inputs::Base # Set this to true to connect to a sandbox sfdc instance # logging in through test.salesforce.com config :use_test_sandbox, :validate => :boolean, :default => false + # By default, this uses the default Restforce API version. # To override this, set this to something like "32.0" for example config :api_version, :validate => :string, :required => false + # Consumer Key for authentication. You must set up a new SFDC # connected app with oath to use this output. More information # can be found here: # https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm config :client_id, :validate => :string, :required => true + # Consumer Secret from your oauth enabled connected app config :client_secret, :validate => :string, :required => true + # A valid salesforce user name, usually your email address. # Used for authentication and will be the user all objects # are created or modified by config :username, :validate => :string, :required => true + # The password used to login to sfdc config :password, :validate => :string, :required => true + # The security token for this account. For more information about # generting a security token, see: # https://help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm config :security_token, :validate => :string, :required => true - # The name of the salesforce object you are creating or updating - config :sfdc_object_name, :validate => :string, :required => true + + # A list of the salesforce objects you are pulling. If you are specifying + # more than one, you should probably be very careful about using the sfdc_fields + # and sfdc_filters configuration options. + config :sfdc_object_names, :validate => :array, :required => true + # These are the field names to return in the Salesforce query # If this is empty, all fields are returned. + # NOTE: If specifying multiple objects to pull, these fields must + # be valid for ALL objects being pulled. Using this with multiple + # sObjects is probably not a good idea. config :sfdc_fields, :validate => :array, :default => [] + # These options will be added to the WHERE clause in the # SOQL statement. Additional fields can be filtered on by # adding field1 = value1 AND field2 = value2 AND... + # NOTE: If specifying multiple objects to pull, these filters must + # be valid for ALL objects being pulled. config :sfdc_filters, :validate => :string, :default => "" + # Setting this to true will convert SFDC's NamedFields__c to named_fields__c config :to_underscores, :validate => :boolean, :default => false + # This will add a field to the event letting you know what sObject the event is. + # This is useful for filtering when specifying multiple objects names to query. + config :sfdc_object_type_field, :validate => :string, :required => false + + # Interval to run the command. Value is in seconds. If no interval is given, + # this plugin only fetches data once. + config :interval, :validate => :number, :required => false, :default => -1 + + public def register require 'restforce' - obj_desc = client.describe(@sfdc_object_name) - @sfdc_field_types = get_field_types(obj_desc) - @sfdc_fields = get_all_fields if @sfdc_fields.empty? end # def register public def run(queue) - results = client.query(get_query()) - if results && results.first - results.each do |result| - event = LogStash::Event.new() - decorate(event) - @sfdc_fields.each do |field| - field_type = @sfdc_field_types[field] - value = result.send(field) - event_key = @to_underscores ? underscore(field) : field - if not value.nil? - case field_type - when 'datetime', 'date' - event.set(event_key, format_time(value)) - else - event.set(event_key, value) + while !stop? + start = Time.now + @sfdc_object_names.each do |sfdc_object_name| + obj_desc = client.describe(sfdc_object_name) + @sfdc_field_types = get_field_types(obj_desc) + current_sfdc_fields = (@sfdc_fields.empty?) ? get_all_fields : @sfdc_fields + + results = client.query(get_query(current_sfdc_fields,sfdc_object_name)) + if results && results.first + results.each do |result| + event = LogStash::Event.new() + decorate(event) + current_sfdc_fields.each do |field| + field_type = @sfdc_field_types[field] + value = result.send(field) + event_key = @to_underscores ? underscore(field) : field + if not value.nil? + case field_type + when 'datetime', 'date' + event.set(event_key, format_time(value)) + else + event.set(event_key, value) + end + end end + event.set(@sfdc_object_type_field, sfdc_object_name) if @sfdc_object_type_field + queue << event end end - queue << event - end + end # loop sObjects + if @interval == -1 + break + else + duration = Time.now - start + # Sleep for the remainder of the interval, or 0 if the duration ran + # longer than the interval. + sleeptime = [0, @interval - duration].max + if sleeptime == 0 + @logger.warn("Execution ran longer than the interval. Skipping sleep.", + :duration => duration, + :interval => @interval) + else + sleep(sleeptime) + end + end # end interval check + Stud.stoppable_sleep(@interval) { stop? } end end # def run @@ -137,11 +186,11 @@ def client_options end private - def get_query() - query = ["SELECT",@sfdc_fields.join(','), - "FROM",@sfdc_object_name] + def get_query(sfdc_fields,sfdc_object_name) + query = ["SELECT",sfdc_fields.join(','), + "FROM",sfdc_object_name] query << ["WHERE",@sfdc_filters] unless @sfdc_filters.empty? - query << "ORDER BY LastModifiedDate DESC" if @sfdc_fields.include?('LastModifiedDate') + query << "ORDER BY LastModifiedDate DESC" if sfdc_fields.include?('LastModifiedDate') query_str = query.flatten.join(" ") @logger.debug? && @logger.debug("SFDC Query", :query => query_str) return query_str