From 166cdc232c776309656aa18935905c018ba26e0b Mon Sep 17 00:00:00 2001 From: James McCallum Date: Tue, 21 Nov 2017 15:00:39 -0800 Subject: [PATCH 1/4] Fixes for python3 and airflow 1.9 --- .vscode/temp.sql | 0 fileflow/operators/__init__.py | 4 ++-- fileflow/operators/dive_operator.py | 3 ++- fileflow/operators/dive_python_operator.py | 6 +++--- 4 files changed, 7 insertions(+), 6 deletions(-) create mode 100644 .vscode/temp.sql diff --git a/.vscode/temp.sql b/.vscode/temp.sql new file mode 100644 index 0000000..e69de29 diff --git a/fileflow/operators/__init__.py b/fileflow/operators/__init__.py index 65f8cb6..9c646af 100644 --- a/fileflow/operators/__init__.py +++ b/fileflow/operators/__init__.py @@ -1,4 +1,4 @@ -from dive_operator import DiveOperator -from dive_python_operator import DivePythonOperator +from fileflow.operators.dive_operator import DiveOperator +from fileflow.operators.dive_python_operator import DivePythonOperator __all__ = ['DiveOperator', 'DivePythonOperator'] diff --git a/fileflow/operators/dive_operator.py b/fileflow/operators/dive_operator.py index c196f5f..b2435f1 100644 --- a/fileflow/operators/dive_operator.py +++ b/fileflow/operators/dive_operator.py @@ -20,13 +20,14 @@ class DiveOperator(BaseOperator): """ def __init__(self, *args, **kwargs): + data_dependencies = kwargs.pop('data_dependencies', {}) super(DiveOperator, self).__init__(*args, **kwargs) # The upstream dependencies # These must always be specified # Dictionary can contain any number of keys which must be redirected # in the business logic to their read/parse methods. - self.data_dependencies = kwargs.pop('data_dependencies', {}) + self.data_dependencies = data_dependencies # private storage driver attribute; lazy loaded by the storage property or set by the storage property setter self._storage = None diff --git a/fileflow/operators/dive_python_operator.py b/fileflow/operators/dive_python_operator.py index 13007eb..daa822d 100644 --- a/fileflow/operators/dive_python_operator.py +++ b/fileflow/operators/dive_python_operator.py @@ -6,9 +6,9 @@ .. moduleauthor:: Miriam Sexton """ -from airflow.operators import PythonOperator +from airflow.operators.python_operator import PythonOperator -from .dive_operator import DiveOperator +from fileflow.operators.dive_operator import DiveOperator class DivePythonOperator(DiveOperator, PythonOperator): @@ -20,7 +20,7 @@ class DivePythonOperator(DiveOperator, PythonOperator): def __init__(self, python_object, python_method="run", *args, **kwargs): self.python_object = python_object self.python_method = python_method - kwargs['python_callable'] = None + kwargs['python_callable'] = lambda: None super(DivePythonOperator, self).__init__(*args, **kwargs) From 5e9e0de79c645de2c175e6a278f6750d403fe800 Mon Sep 17 00:00:00 2001 From: James McCallum Date: Tue, 21 Nov 2017 15:35:46 -0800 Subject: [PATCH 2/4] smaller py3 compatibility things --- .gitignore | 7 ++++++- fileflow/configuration.py | 10 ++++++++-- tests/task_runners/test_taskRunner.py | 2 +- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 344b9df..e7f96e8 100644 --- a/.gitignore +++ b/.gitignore @@ -143,4 +143,9 @@ fabric.properties # modules.xml # .idea/misc.xml # *.ipr -.idea \ No newline at end of file +.idea + +## Vscode Stuff ## +.vscode +airflow.cfg +unittests.cfg \ No newline at end of file diff --git a/fileflow/configuration.py b/fileflow/configuration.py index e605f1c..7b0ea24 100644 --- a/fileflow/configuration.py +++ b/fileflow/configuration.py @@ -53,14 +53,20 @@ def _ensure_section_exists(section_name): airflow_configuration.set('fileflow', 'aws_access_key_id', aws_access_key_id_env_var) else: boto_aws_access_key_id_default = boto_config.get('Credentials', 'aws_access_key_id') - airflow_configuration.set('fileflow', 'aws_access_key_id', boto_aws_access_key_id_default) + if boto_aws_access_key_id_default: + airflow_configuration.set('fileflow', 'aws_access_key_id', boto_aws_access_key_id_default) + else: + raise ValueError('No AWS access key_id found') if not airflow_configuration.has_option('fileflow', 'aws_secret_access_key'): if aws_secret_access_key_env_var: airflow_configuration.set('fileflow', 'aws_secret_access_key', aws_secret_access_key_env_var) else: boto_aws_secret_access_key_default = boto_config.get('Credentials', 'aws_secret_access_key') - airflow_configuration.set('fileflow', 'aws_secret_access_key', boto_aws_secret_access_key_default) + if boto_aws_secret_access_key_default: + airflow_configuration.set('fileflow', 'aws_secret_access_key', boto_aws_secret_access_key_default) + else: + raise ValueError('No AWS secret access key found') def get(section, key, **kwargs): diff --git a/tests/task_runners/test_taskRunner.py b/tests/task_runners/test_taskRunner.py index 66a5f17..970301e 100644 --- a/tests/task_runners/test_taskRunner.py +++ b/tests/task_runners/test_taskRunner.py @@ -237,7 +237,7 @@ def test_read_upstream_pandas_csv(self, mock_csv_reader): # Test once with the default arguments self.task_runner_instance.read_upstream_pandas_csv('dep_one') mock_get_stream.assert_called_once_with('dep_one', None) - print 'assert called onse a' + print('assert called onse a') mock_csv_reader.assert_called_once_with( filename_or_stream=fake_stream, encoding='utf-8' From 3fd98997b1431710e4e28c8acd9e20d57369ccdf Mon Sep 17 00:00:00 2001 From: James McCallum Date: Tue, 21 Nov 2017 18:45:53 -0800 Subject: [PATCH 3/4] Lots of unit test updates to address str v bytes explicitly --- fileflow/storage_drivers/s3_storage_driver.py | 6 +- simpletest.py | 38 +++++++++++++ .../storage_drivers/test_fileStorageDriver.py | 2 +- tests/storage_drivers/test_s3StorageDriver.py | 56 +++++++++++-------- tests/task_runners/test_taskRunner.py | 2 +- 5 files changed, 76 insertions(+), 28 deletions(-) create mode 100644 simpletest.py diff --git a/fileflow/storage_drivers/s3_storage_driver.py b/fileflow/storage_drivers/s3_storage_driver.py index 41a9f20..51a67b4 100644 --- a/fileflow/storage_drivers/s3_storage_driver.py +++ b/fileflow/storage_drivers/s3_storage_driver.py @@ -83,7 +83,7 @@ def get_read_stream(self, dag_id, task_id, execution_date): if key is not None: import tempfile - temp_file_stream = tempfile.TemporaryFile(mode='w+b') + temp_file_stream = tempfile.NamedTemporaryFile(mode='w+b') key.get_file(temp_file_stream) # Stream has been read in and is now at the end @@ -118,8 +118,8 @@ def write_from_stream(self, dag_id, task_id, execution_date, stream, content_typ :param string|None content_type: pass None to not set """ # S3 does not like reading streams or chunks, so for now, just set it as a string and write it that way - str = stream.read() - self.write(dag_id, task_id, execution_date, str, content_type) + string = stream.read() + self.write(dag_id, task_id, execution_date, string, content_type) def list_filenames_in_path(self, path): """ diff --git a/simpletest.py b/simpletest.py new file mode 100644 index 0000000..fa181ef --- /dev/null +++ b/simpletest.py @@ -0,0 +1,38 @@ +#encoding=utf-8 +from datetime import datetime +import tempfile +import mimetypes + +from fileflow.storage_drivers import S3StorageDriver + +import boto +from moto import mock_s3 + + + +@mock_s3 +def test_temporaryfile(file): + + + bucket_name = 's3storagesdrivertest' + conn = boto.connect_s3() + conn.create_bucket(bucket_name) + driver = S3StorageDriver('','',bucket_name) + # We need to create the bucket since this is all in Moto's 'virtual' AWS account + bucket = conn.get_bucket(bucket_name) + + key = bucket.new_key('test1') + file.write(u'abč'.encode('utf-8')) + file.seek(0) + + driver.write_from_stream('abc','123',datetime(2017,10,1), file, content_type=None) + + print('Key content type is {}'.format(key.content_type)) + +file1 = tempfile.TemporaryFile() +print('--------------------- testing tempfile') +test_temporaryfile(file1) + +print('--------------------- testing named tempfile') +file2 = tempfile.NamedTemporaryFile() +test_temporaryfile(file2) \ No newline at end of file diff --git a/tests/storage_drivers/test_fileStorageDriver.py b/tests/storage_drivers/test_fileStorageDriver.py index 1b5ed41..e6090f9 100644 --- a/tests/storage_drivers/test_fileStorageDriver.py +++ b/tests/storage_drivers/test_fileStorageDriver.py @@ -61,7 +61,7 @@ def test_get_read_stream_simple(self): string_value = open('tests/fixtures/SampleUniformData.json', 'r').read() actual_result = driver.get_read_stream(*fake_read_stream_args) - self.assertEqual(actual_result.read(), string_value) + self.assertEqual(actual_result.read().decode('utf-8'), string_value) def test_write(self): """ diff --git a/tests/storage_drivers/test_s3StorageDriver.py b/tests/storage_drivers/test_s3StorageDriver.py index e52e146..e824b02 100644 --- a/tests/storage_drivers/test_s3StorageDriver.py +++ b/tests/storage_drivers/test_s3StorageDriver.py @@ -1,3 +1,4 @@ +# encoding=utf-8 from unittest import TestCase from fileflow.storage_drivers import S3StorageDriver from datetime import datetime @@ -5,6 +6,7 @@ from moto import mock_s3 from nose.plugins.attrib import attr import boto +import sys @attr('unittest') @@ -78,18 +80,11 @@ def test_get_read_stream(self): """ import tempfile - first_value = 'abc' - second_value_file = open('tests/fixtures/SampleUniformData.json', 'r') - second_value_string = second_value_file.read() - second_value_file.seek(0) - third_value_bytes = bytearray(['a', 'b', '\xe4']) - third_value_file = tempfile.TemporaryFile() - third_value_file.write(third_value_bytes) - third_value_file.seek(0) - dag_name = 'the_dag' task_name = 'the_task' + # Write and read back from a string value + first_value = 'abc' first_key = self.bucket.new_key(dag_name + '/' + task_name + '/' + '2016-01-01') first_key.set_contents_from_string(first_value) first_key.set_metadata('Content-Type', 'text/plain') @@ -101,8 +96,12 @@ def test_get_read_stream(self): task_id=task_name, execution_date=datetime(2016, 1, 1) ) - self.assertEqual(first_value, first_result_stream.read()) + self.assertEqual(first_value, first_result_stream.read().decode('utf-8')) + # Write and read back from a local json file + second_value_file = open('tests/fixtures/SampleUniformData.json', 'r') + second_value_string = second_value_file.read() + second_value_file.seek(0) second_key = self.bucket.new_key(dag_name + '/' + task_name + '/' + '2016-01-02') second_key.set_metadata('Content-Type', 'text/plain') second_key.set_contents_from_file(second_value_file) @@ -113,8 +112,12 @@ def test_get_read_stream(self): task_id=task_name, execution_date=datetime(2016, 1, 2) ) - self.assertEqual(second_value_string, second_result_stream.read()) + self.assertEqual(second_value_string, second_result_stream.read().decode('utf-8')) + third_value_bytes = u'abč'.encode('utf-8') + third_value_file = tempfile.NamedTemporaryFile() + third_value_file.write(third_value_bytes) + third_value_file.seek(0) third_key = self.bucket.new_key(dag_name + '/' + task_name + '/' + '2016-01-03') third_key.set_contents_from_file(third_value_file) third_key.set_acl('private') @@ -135,7 +138,7 @@ def test_write(self): self.driver.write('the_dag', 'the_task', datetime(1983, 9, 5), data, 'text/plain') s3_key = self.bucket.get_key(key_name) - actual = s3_key.get_contents_as_string() + actual = s3_key.get_contents_as_string(encoding='utf-8') self.assertEqual(actual, data) @@ -144,9 +147,9 @@ def test_write_from_stream(self): Test writing to S3 from a stream """ import tempfile - stream = tempfile.TemporaryFile() + stream = tempfile.NamedTemporaryFile() - some_string = 'abc' + some_string = b'abc' stream.write(some_string) stream.seek(0) @@ -168,13 +171,14 @@ def test_write_from_stream(self): file_data = f.read() s3_key = self.bucket.get_key(second_key_name) - actual_data = s3_key.get_contents_as_string() + actual_data = s3_key.get_contents_as_string(encoding='utf-8') self.assertEqual(file_data, actual_data) + del s3_key # Try something that's not a simple string - some_values = bytearray(['a', 'b', '\xe4']) - stream = tempfile.TemporaryFile() + some_values = u'abč'.encode('utf-8') + stream = tempfile.NamedTemporaryFile() stream.write(some_values) stream.seek(0) @@ -184,16 +188,22 @@ def test_write_from_stream(self): s3_key = self.bucket.get_key(third_key_name) # Check that the default content type was used - self.assertEqual(s3_key.content_type, 'application/octet-stream') - - actual_data_as_string = s3_key.get_contents_as_string() + # There's a bug in moto with python 3 that injects content types + # https://github.com/spulec/moto/issues/657 + if sys.version_info[0] < 3: + self.assertEqual(s3_key.content_type, 'application/octet-stream') + else: + self.assertEqual(s3_key.content_type, + 'text/plain; charset=utf-8, application/octet-stream') + + actual_data_as_string = s3_key.get_contents_as_string(encoding='utf-8') self.assertIsInstance(actual_data_as_string, str) - self.assertEqual(str(some_values), actual_data_as_string) + self.assertEqual(some_values.decode('utf-8'), actual_data_as_string) output_stream = tempfile.TemporaryFile() s3_key.get_file(output_stream) output_stream.seek(0) - self.assertEqual(str(some_values), output_stream.read()) + self.assertEqual(some_values, output_stream.read()) def test_list_filenames_in_path(self): """ @@ -246,6 +256,6 @@ def test_get_or_create_key(self): # Does the get. existing_key = self.driver.get_or_create_key(key_name) - content = existing_key.get_contents_as_string() + content = existing_key.get_contents_as_string(encoding='utf-8') self.assertEqual(content, expected) diff --git a/tests/task_runners/test_taskRunner.py b/tests/task_runners/test_taskRunner.py index 970301e..07675c2 100644 --- a/tests/task_runners/test_taskRunner.py +++ b/tests/task_runners/test_taskRunner.py @@ -143,7 +143,7 @@ def test_get_upstream_stream(self): to the storage driver's get_read_stream method. """ temp_stream = tempfile.TemporaryFile() - some_string = 'abcde' + some_string = b'abcde' temp_stream.write(some_string) mock_reader = mock.MagicMock() From 22c564f7802c7b515f20a4799b3c17c603aae232 Mon Sep 17 00:00:00 2001 From: James McCallum Date: Tue, 21 Nov 2017 18:47:19 -0800 Subject: [PATCH 4/4] deleted a testing file --- simpletest.py | 38 -------------------------------------- 1 file changed, 38 deletions(-) delete mode 100644 simpletest.py diff --git a/simpletest.py b/simpletest.py deleted file mode 100644 index fa181ef..0000000 --- a/simpletest.py +++ /dev/null @@ -1,38 +0,0 @@ -#encoding=utf-8 -from datetime import datetime -import tempfile -import mimetypes - -from fileflow.storage_drivers import S3StorageDriver - -import boto -from moto import mock_s3 - - - -@mock_s3 -def test_temporaryfile(file): - - - bucket_name = 's3storagesdrivertest' - conn = boto.connect_s3() - conn.create_bucket(bucket_name) - driver = S3StorageDriver('','',bucket_name) - # We need to create the bucket since this is all in Moto's 'virtual' AWS account - bucket = conn.get_bucket(bucket_name) - - key = bucket.new_key('test1') - file.write(u'abč'.encode('utf-8')) - file.seek(0) - - driver.write_from_stream('abc','123',datetime(2017,10,1), file, content_type=None) - - print('Key content type is {}'.format(key.content_type)) - -file1 = tempfile.TemporaryFile() -print('--------------------- testing tempfile') -test_temporaryfile(file1) - -print('--------------------- testing named tempfile') -file2 = tempfile.NamedTemporaryFile() -test_temporaryfile(file2) \ No newline at end of file