Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,9 @@ fabric.properties
# modules.xml
# .idea/misc.xml
# *.ipr
.idea
.idea

## Vscode Stuff ##
.vscode
airflow.cfg
unittests.cfg
Empty file added .vscode/temp.sql
Empty file.
10 changes: 8 additions & 2 deletions fileflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,20 @@ def _ensure_section_exists(section_name):
airflow_configuration.set('fileflow', 'aws_access_key_id', aws_access_key_id_env_var)
else:
boto_aws_access_key_id_default = boto_config.get('Credentials', 'aws_access_key_id')
airflow_configuration.set('fileflow', 'aws_access_key_id', boto_aws_access_key_id_default)
if boto_aws_access_key_id_default:
airflow_configuration.set('fileflow', 'aws_access_key_id', boto_aws_access_key_id_default)
else:
raise ValueError('No AWS access key_id found')

if not airflow_configuration.has_option('fileflow', 'aws_secret_access_key'):
if aws_secret_access_key_env_var:
airflow_configuration.set('fileflow', 'aws_secret_access_key', aws_secret_access_key_env_var)
else:
boto_aws_secret_access_key_default = boto_config.get('Credentials', 'aws_secret_access_key')
airflow_configuration.set('fileflow', 'aws_secret_access_key', boto_aws_secret_access_key_default)
if boto_aws_secret_access_key_default:
airflow_configuration.set('fileflow', 'aws_secret_access_key', boto_aws_secret_access_key_default)
else:
raise ValueError('No AWS secret access key found')


def get(section, key, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions fileflow/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dive_operator import DiveOperator
from dive_python_operator import DivePythonOperator
from fileflow.operators.dive_operator import DiveOperator
from fileflow.operators.dive_python_operator import DivePythonOperator

__all__ = ['DiveOperator', 'DivePythonOperator']
3 changes: 2 additions & 1 deletion fileflow/operators/dive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ class DiveOperator(BaseOperator):
"""

def __init__(self, *args, **kwargs):
data_dependencies = kwargs.pop('data_dependencies', {})
super(DiveOperator, self).__init__(*args, **kwargs)

# The upstream dependencies
# These must always be specified
# Dictionary can contain any number of keys which must be redirected
# in the business logic to their read/parse methods.
self.data_dependencies = kwargs.pop('data_dependencies', {})
self.data_dependencies = data_dependencies

# private storage driver attribute; lazy loaded by the storage property or set by the storage property setter
self._storage = None
Expand Down
6 changes: 3 additions & 3 deletions fileflow/operators/dive_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
.. moduleauthor:: Miriam Sexton <miriam@industrydive.com>
"""

from airflow.operators import PythonOperator
from airflow.operators.python_operator import PythonOperator

from .dive_operator import DiveOperator
from fileflow.operators.dive_operator import DiveOperator


class DivePythonOperator(DiveOperator, PythonOperator):
Expand All @@ -20,7 +20,7 @@ class DivePythonOperator(DiveOperator, PythonOperator):
def __init__(self, python_object, python_method="run", *args, **kwargs):
self.python_object = python_object
self.python_method = python_method
kwargs['python_callable'] = None
kwargs['python_callable'] = lambda: None

super(DivePythonOperator, self).__init__(*args, **kwargs)

Expand Down
6 changes: 3 additions & 3 deletions fileflow/storage_drivers/s3_storage_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def get_read_stream(self, dag_id, task_id, execution_date):

if key is not None:
import tempfile
temp_file_stream = tempfile.TemporaryFile(mode='w+b')
temp_file_stream = tempfile.NamedTemporaryFile(mode='w+b')
key.get_file(temp_file_stream)

# Stream has been read in and is now at the end
Expand Down Expand Up @@ -118,8 +118,8 @@ def write_from_stream(self, dag_id, task_id, execution_date, stream, content_typ
:param string|None content_type: pass None to not set
"""
# S3 does not like reading streams or chunks, so for now, just set it as a string and write it that way
str = stream.read()
self.write(dag_id, task_id, execution_date, str, content_type)
string = stream.read()
self.write(dag_id, task_id, execution_date, string, content_type)

def list_filenames_in_path(self, path):
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/storage_drivers/test_fileStorageDriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_get_read_stream_simple(self):
string_value = open('tests/fixtures/SampleUniformData.json', 'r').read()
actual_result = driver.get_read_stream(*fake_read_stream_args)

self.assertEqual(actual_result.read(), string_value)
self.assertEqual(actual_result.read().decode('utf-8'), string_value)

def test_write(self):
"""
Expand Down
56 changes: 33 additions & 23 deletions tests/storage_drivers/test_s3StorageDriver.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# encoding=utf-8
from unittest import TestCase
from fileflow.storage_drivers import S3StorageDriver
from datetime import datetime
from mock import MagicMock
from moto import mock_s3
from nose.plugins.attrib import attr
import boto
import sys


@attr('unittest')
Expand Down Expand Up @@ -78,18 +80,11 @@ def test_get_read_stream(self):
"""
import tempfile

first_value = 'abc'
second_value_file = open('tests/fixtures/SampleUniformData.json', 'r')
second_value_string = second_value_file.read()
second_value_file.seek(0)
third_value_bytes = bytearray(['a', 'b', '\xe4'])
third_value_file = tempfile.TemporaryFile()
third_value_file.write(third_value_bytes)
third_value_file.seek(0)

dag_name = 'the_dag'
task_name = 'the_task'

# Write and read back from a string value
first_value = 'abc'
first_key = self.bucket.new_key(dag_name + '/' + task_name + '/' + '2016-01-01')
first_key.set_contents_from_string(first_value)
first_key.set_metadata('Content-Type', 'text/plain')
Expand All @@ -101,8 +96,12 @@ def test_get_read_stream(self):
task_id=task_name,
execution_date=datetime(2016, 1, 1)
)
self.assertEqual(first_value, first_result_stream.read())
self.assertEqual(first_value, first_result_stream.read().decode('utf-8'))

# Write and read back from a local json file
second_value_file = open('tests/fixtures/SampleUniformData.json', 'r')
second_value_string = second_value_file.read()
second_value_file.seek(0)
second_key = self.bucket.new_key(dag_name + '/' + task_name + '/' + '2016-01-02')
second_key.set_metadata('Content-Type', 'text/plain')
second_key.set_contents_from_file(second_value_file)
Expand All @@ -113,8 +112,12 @@ def test_get_read_stream(self):
task_id=task_name,
execution_date=datetime(2016, 1, 2)
)
self.assertEqual(second_value_string, second_result_stream.read())
self.assertEqual(second_value_string, second_result_stream.read().decode('utf-8'))

third_value_bytes = u'abč'.encode('utf-8')
third_value_file = tempfile.NamedTemporaryFile()
third_value_file.write(third_value_bytes)
third_value_file.seek(0)
third_key = self.bucket.new_key(dag_name + '/' + task_name + '/' + '2016-01-03')
third_key.set_contents_from_file(third_value_file)
third_key.set_acl('private')
Expand All @@ -135,7 +138,7 @@ def test_write(self):
self.driver.write('the_dag', 'the_task', datetime(1983, 9, 5), data, 'text/plain')

s3_key = self.bucket.get_key(key_name)
actual = s3_key.get_contents_as_string()
actual = s3_key.get_contents_as_string(encoding='utf-8')

self.assertEqual(actual, data)

Expand All @@ -144,9 +147,9 @@ def test_write_from_stream(self):
Test writing to S3 from a stream
"""
import tempfile
stream = tempfile.TemporaryFile()
stream = tempfile.NamedTemporaryFile()

some_string = 'abc'
some_string = b'abc'
stream.write(some_string)
stream.seek(0)

Expand All @@ -168,13 +171,14 @@ def test_write_from_stream(self):
file_data = f.read()

s3_key = self.bucket.get_key(second_key_name)
actual_data = s3_key.get_contents_as_string()
actual_data = s3_key.get_contents_as_string(encoding='utf-8')

self.assertEqual(file_data, actual_data)

del s3_key
# Try something that's not a simple string
some_values = bytearray(['a', 'b', '\xe4'])
stream = tempfile.TemporaryFile()
some_values = u'abč'.encode('utf-8')
stream = tempfile.NamedTemporaryFile()
stream.write(some_values)
stream.seek(0)

Expand All @@ -184,16 +188,22 @@ def test_write_from_stream(self):
s3_key = self.bucket.get_key(third_key_name)

# Check that the default content type was used
self.assertEqual(s3_key.content_type, 'application/octet-stream')

actual_data_as_string = s3_key.get_contents_as_string()
# There's a bug in moto with python 3 that injects content types
# https://github.com/spulec/moto/issues/657
if sys.version_info[0] < 3:
self.assertEqual(s3_key.content_type, 'application/octet-stream')
else:
self.assertEqual(s3_key.content_type,
'text/plain; charset=utf-8, application/octet-stream')

actual_data_as_string = s3_key.get_contents_as_string(encoding='utf-8')
self.assertIsInstance(actual_data_as_string, str)
self.assertEqual(str(some_values), actual_data_as_string)
self.assertEqual(some_values.decode('utf-8'), actual_data_as_string)

output_stream = tempfile.TemporaryFile()
s3_key.get_file(output_stream)
output_stream.seek(0)
self.assertEqual(str(some_values), output_stream.read())
self.assertEqual(some_values, output_stream.read())

def test_list_filenames_in_path(self):
"""
Expand Down Expand Up @@ -246,6 +256,6 @@ def test_get_or_create_key(self):

# Does the get.
existing_key = self.driver.get_or_create_key(key_name)
content = existing_key.get_contents_as_string()
content = existing_key.get_contents_as_string(encoding='utf-8')

self.assertEqual(content, expected)
4 changes: 2 additions & 2 deletions tests/task_runners/test_taskRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def test_get_upstream_stream(self):
to the storage driver's get_read_stream method.
"""
temp_stream = tempfile.TemporaryFile()
some_string = 'abcde'
some_string = b'abcde'
temp_stream.write(some_string)

mock_reader = mock.MagicMock()
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_read_upstream_pandas_csv(self, mock_csv_reader):
# Test once with the default arguments
self.task_runner_instance.read_upstream_pandas_csv('dep_one')
mock_get_stream.assert_called_once_with('dep_one', None)
print 'assert called onse a'
print('assert called onse a')
mock_csv_reader.assert_called_once_with(
filename_or_stream=fake_stream,
encoding='utf-8'
Expand Down