Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 98 additions & 21 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ that will be used in the SOQL query.

==== HTTP proxy

If your infrastructure uses a HTTP proxy, you can set the `SALESFORCE_PROXY_URI` environment variable with the desired URI value (e.g `export SALESFORCE_PROXY_URI="http://proxy.example.com:123"`).
If your infrastructure uses an HTTP proxy, you can set the `SALESFORCE_PROXY_URI` environment variable with the desired URI value (e.g `export SALESFORCE_PROXY_URI="http://proxy.example.com:123"`).

==== Example
This example prints all the Salesforce Opportunities to standard out
Expand Down Expand Up @@ -75,6 +75,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-api_version>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-changed_data_filter>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-client_id>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-client_secret>> |<<password,password>>|Yes
| <<plugins-{type}s-{plugin}-interval>> |<<number,number>>|No
Expand All @@ -86,6 +87,8 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-sfdc_object_name>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-timeout>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-to_underscores>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-tracking_field>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-tracking_field_value_file>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-use_test_sandbox>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-use_tooling_api>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-username>> |<<string,string>>|Yes
Expand All @@ -103,7 +106,29 @@ input plugins.
* There is no default value for this setting.

By default, this uses the default Restforce API version.
To override this, set this to something like "32.0" for example
To override this, set this to something like "32.0" for example.

[id="plugins-{type}s-{plugin}-changed_data_filter"]
===== `changed_data_filter`

* Value type is <<string,string>>
* There is no default value for this setting.

The filter to add to the Salesforce query when a previous tracking field value
was read from the <<plugins-{type}s-{plugin}-tracking_field_value_file>>.
The string can (and should) contain a placeholder `%+{last_tracking_field_value}+` that
will be substituted with the actual value read from the <<plugins-{type}s-{plugin}-tracking_field_value_file>>.

This clause is combined with any <<plugins-{type}s-{plugin}-sfdc_filters>>
clause that is configured using the `AND` operator.

The value should be properly quoted according to the SOQL rules for the field
type.

**Examples:**

"changed_data_filter" => "Number > '%{last_tracking_field_value}'"
"changed_data_filter" => "LastModifiedDate >= %{last_tracking_field_value}"

[id="plugins-{type}s-{plugin}-client_id"]
===== `client_id`
Expand All @@ -112,10 +137,10 @@ To override this, set this to something like "32.0" for example
* Value type is <<string,string>>
* There is no default value for this setting.

Consumer Key for authentication. You must set up a new SFDC
connected app with oath to use this output. More information
Consumer Key for authentication. You must set up a new Salesforce
connected app with OAuth enabled to use this plugin. More information
can be found here:
https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm
https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm.

[id="plugins-{type}s-{plugin}-client_secret"]
===== `client_secret`
Expand All @@ -124,7 +149,7 @@ https://help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm
* Value type is <<password,password>>
* There is no default value for this setting.

Consumer Secret from your oauth enabled connected app
Consumer secret from your OAuth enabled connected app.

[id="plugins-{type}s-{plugin}-interval"]
===== `interval`
Expand All @@ -150,21 +175,21 @@ If this property is not specified or is set to -1, the plugin will run once and
* Value type is <<password,password>>
* There is no default value for this setting.

The password used to login to sfdc
The password used to log in to Salesforce.

[id="plugins-{type}s-{plugin}-security_token"]
===== `security_token`
===== `security_token`

* This is a required setting.
* Value type is <<password,password>>
* There is no default value for this setting.
* This is a required setting.
* Value type is <<password,password>>
* There is no default value for this setting.

The security token for this account. For more information about
generting a security token, see:
https://help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm
generating a security token, see:
https://help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm.

[id="plugins-{type}s-{plugin}-sfdc_fields"]
===== `sfdc_fields`
===== `sfdc_fields`

* Value type is <<array,array>>
* Default value is `[]`
Expand All @@ -178,9 +203,9 @@ If this is empty, all fields are returned.
* Value type is <<string,string>>
* Default value is `""`

These options will be added to the WHERE clause in the
These options will be added to the `WHERE` clause in the
SOQL statement. Additional fields can be filtered on by
adding field1 = value1 AND field2 = value2 AND...
adding `field1 = value1 AND field2 = value2 AND...`.

[id="plugins-{type}s-{plugin}-sfdc_instance_url"]
===== `sfdc_instance_url`
Expand All @@ -202,7 +227,7 @@ but not both to configure the url to which the plugin connects to.
* Value type is <<string,string>>
* There is no default value for this setting.

The name of the salesforce object you are creating or updating
The name of the Salesforce object you are creating or updating.

[id="plugins-{type}s-{plugin}-timeout"]
===== `timeout`
Expand All @@ -221,7 +246,59 @@ read, an error occurs.
* Value type is <<boolean,boolean>>
* Default value is `false`

Setting this to true will convert SFDC's NamedFields__c to named_fields__c
Setting this to true will convert Salesforce's `++NamedFields__c++` to `++named_fields__c++`.

[id="plugins-{type}s-{plugin}-tracking_field"]
===== `tracking_field`

* Value type is <<string,string>>
* There is no default value for this setting.

The field to track for incremental data loads. This field will
be used in an `ORDER BY ... ASC` clause that is added to the Salesforce query.
This field _should_ also be used in the <<plugins-{type}s-{plugin}-changed_data_filter>> clause
to actually achieve incremental loading of data.

The last value (which is the highest value if the query sorts by this field ascending)
value for this field will be saved to the file at the path configured by
<<plugins-{type}s-{plugin}-tracking_field_value_file>>, if specified.

This field should ideally be strictly ascending for new records. An
autonumber field is ideal for this.

The standard `LastModifiedDate` field can be used, but since it is not _strictly_
ascending (multiple records can have the same `LastModifiedDate`, the
<<plugins-{type}s-{plugin}-changed_data_filter>> should account for this by using the `>=`
operator, and duplicates should be expected.

Note that Salesforce does not guarantee that the standard `Id` field has ascending
values for new records (https://developer.salesforce.com/docs/atlas.en-us.apexcode.meta/apexcode/apex_testing_best_practices.htm).
Therefore, using `Id` as tracking field risks missing records and is not recommended.

If this field is not already included in the <<plugins-{type}s-{plugin}-sfdc_fields>>,
it is added.

[id="plugins-{type}s-{plugin}-tracking_field_value_file"]
===== `tracking_field_value_file`

* Value type is <<string,string>>
* There is no default value for this setting.

The full path to the file from which the latest tracking field value from the previous
plugin invocation will be read, and to which the new latest tracking field value will be
written after the current plugin invocation.

This keeps persistent track of the last seen value of the tracking field used for incremental
loading of data.

The file should be readable and writable by the Logstash process.

If the file exists and a <<plugins-{type}s-{plugin}-changed_data_filter>> is configured,
a changed data filter clause is added to the query (and combined with any <<plugins-{type}s-{plugin}-sfdc_filters>>
clause that is configured using the `AND` operator).

If the result set is not empty, the value for `tracking_field` from the last row is
written to the file.

[id="plugins-{type}s-{plugin}-use_test_sandbox"]
===== `use_test_sandbox`
Expand All @@ -230,7 +307,7 @@ Setting this to true will convert SFDC's NamedFields__c to named_fields__c
* Default value is `false`

Set this to true to connect to a sandbox sfdc instance
logging in through test.salesforce.com
logging in through test.salesforce.com.

Use either this or the `sfdc_instance_url` configuration option
but not both to configure the url to which the plugin connects to.
Expand All @@ -255,9 +332,9 @@ of elements of sfdc flows) and security health check risks.
* Value type is <<string,string>>
* There is no default value for this setting.

A valid salesforce user name, usually your email address.
A valid Salesforce username, usually your email address.
Used for authentication and will be the user all objects
are created or modified by
are created or modified by.



Expand Down
70 changes: 65 additions & 5 deletions lib/logstash/inputs/salesforce.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,25 @@ class LogStash::Inputs::Salesforce < LogStash::Inputs::Base
# Setting this to true will convert SFDC's NamedFields__c to named_fields__c
config :to_underscores, :validate => :boolean, :default => false

# File that stores the tracking field's latest value. This is read before querying data to interpolate
# the tracking field value into the incremental_filter, and the latest value of the tracking field is written
# to it after all the query results have been read.
config :tracking_field_value_file, :validate => :string, :required => false

# Filter clause to use for incremental retrieval and indexing of data that has changed since the last invodation
# of the plugin. This is combined with sfdc_filters using the AND operator, if tracking_field_value_path exists.
# String interpolation is applied to replace "%{last_tracking_field_value}" in this string with the value read
# from tracking_field_value_file. This would usually be something like "tracking_field > '%{last_tracking_field_value}'"
# where tracking_field is the API name of the actual tracking field set using the tracking_field configuration property,
# e.g. LastModifiedDate
config :changed_data_filter, :validate => :string, :required => false

# The field from which the last value will be stored in the tracking_field_value_file and interpolated
# for "%{last_tracking_field_value}" in the changed_data_filter expression. This field will also be used in an ORDER BY
# clause added to the query, with sorting done ascending, so that the last value in the results is also the
# highest.
config :tracking_field, :validate => :string, :required => false

# Interval to run the command. Value is in seconds. If no interval is given,
# this plugin only fetches data once.
config :interval, :validate => :number, :required => false, :default => -1
Expand All @@ -128,6 +147,7 @@ def run(queue)
while !stop?
start = Time.now
results = client.query(get_query())
latest_tracking_field_value = nil
if results && results.first
results.each do |result|
event = LogStash::Event.new()
Expand All @@ -136,7 +156,7 @@ def run(queue)
field_type = @sfdc_field_types[field]
value = result.send(field)
event_key = @to_underscores ? underscore(field) : field
if not value.nil?
unless value.nil?
case field_type
when 'datetime', 'date'
event.set(event_key, format_time(value))
Expand All @@ -146,8 +166,20 @@ def run(queue)
end
end
queue << event
unless @tracking_field.nil?
latest_tracking_field_value = result[@tracking_field]
end
end # loop sObjects
end

unless @tracking_field_value_file.nil?
unless latest_tracking_field_value.nil?
@logger.debug("Writing latest tracking field value " + latest_tracking_field_value + " to " + @tracking_field_value_file)
File.write(@tracking_field_value_file, latest_tracking_field_value)
else
@logger.debug("No tracking field value found in result, not updating " + @tracking_field_value_file)
end
end # loop sObjects
end

if @interval == -1
break
Expand Down Expand Up @@ -199,15 +231,43 @@ def client_options

private
def get_query()
query = ["SELECT", @sfdc_fields.join(','),
sfdc_fields = @sfdc_fields.dup
unless @tracking_field.nil?
unless sfdc_fields.include?(@tracking_field)
sfdc_fields << [@tracking_field]
end
end
query = ["SELECT", sfdc_fields.join(','),
"FROM", @sfdc_object_name]
query << ["WHERE", @sfdc_filters] unless @sfdc_filters.empty?
query << "ORDER BY LastModifiedDate DESC" if @sfdc_fields.include?('LastModifiedDate')
where = []
unless @sfdc_filters.empty?
append_to_where_clause(@sfdc_filters, where)
end
unless @changed_data_filter.nil?
if File.exist?(@tracking_field_value_file)
last_tracking_field_value = File.read(@tracking_field_value_file)
changed_data_filter_interpolated = @changed_data_filter % { :last_tracking_field_value => last_tracking_field_value }
append_to_where_clause(changed_data_filter_interpolated, where)
end
end
query << where
unless @tracking_field.nil?
query << ["ORDER BY", @tracking_field, "ASC"]
end
query_str = query.flatten.join(" ")
@logger.debug? && @logger.debug("SFDC Query", :query => query_str)
return query_str
end

def append_to_where_clause(changed_data_filter_interpolated, where)
if where.empty?
where << ["WHERE"]
else
where << ["AND"]
end
where << [changed_data_filter_interpolated]
end

private
def get_field_types(obj_desc)
field_types = {}
Expand Down
Loading