diff --git a/.gitignore b/.gitignore index 5711c7a..a54b9eb 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ # The Bundler lockfile should not be cached because its contents is arch-dependent Gemfile.lock .DS_Store +gcs_creds.json + diff --git a/lib/active_storage_encryption/encrypted_gcs_service.rb b/lib/active_storage_encryption/encrypted_gcs_service.rb index 3266114..60611bd 100644 --- a/lib/active_storage_encryption/encrypted_gcs_service.rb +++ b/lib/active_storage_encryption/encrypted_gcs_service.rb @@ -4,6 +4,8 @@ require "google/cloud/storage/service" class ActiveStorageEncryption::EncryptedGCSService < ActiveStorage::Service::GCSService + autoload :ResumableUpload, __dir__ + "/encrypted_gcs_service/resumable_upload.rb" + include ActiveStorageEncryption::PrivateUrlPolicy GCS_ENCRYPTION_KEY_LENGTH_BYTES = 32 # google wants to get a 32 byte key @@ -57,13 +59,15 @@ def url_for_direct_upload(key, expires_in:, checksum:, encryption_key:, content_ end end - def headers_for_direct_upload(key, checksum:, encryption_key:, filename: nil, disposition: nil, content_type: nil, custom_metadata: {}, **) + def headers_for_direct_upload(key, checksum: nil, encryption_key:, filename: nil, disposition: nil, content_type: "binary/octet-stream", custom_metadata: {}, **) headers = { "Content-Type" => content_type, - "Content-MD5" => checksum, # Not strictly required, but it ensures the file bytes we upload match what we want. This way google will error when we upload garbage. **gcs_encryption_key_headers(derive_service_encryption_key(encryption_key)), **custom_metadata_headers(custom_metadata) } + # Content-MD5 is very useful but it is not always possible to provide ahead of time. + # For example, when doing a resumable upload it will not be available before starting. + headers["Content-MD5"] = checksum if checksum headers["Content-Disposition"] = content_disposition_with(type: disposition, filename: filename) if filename if @config[:cache_control].present? @@ -72,6 +76,37 @@ def headers_for_direct_upload(key, checksum:, encryption_key:, filename: nil, di headers end + def compose(source_keys, destination_key, source_encryption_keys:, encryption_key:, filename: nil, content_type: nil, disposition: nil, custom_metadata: {}) + if source_keys.length != source_encryption_keys.length + raise ArgumentError, "With #{source_keys.length} keys to compose there should be exactly as many source_encryption_keys, but got #{source_encryption_keys.length}" + end + content_disposition = content_disposition_with(type: disposition, filename: filename) if disposition && filename + destination_encryption_key = derive_service_encryption_key(encryption_key) + file_for_destination = file_for(destination_key) + # ...content_type: "binary/octet-stream", **signed_url_options + + # As per https://cloud.google.com/storage/docs/xml-api/post-object-resumable the encryption key is + # provided in the headers for the resumable upload start, in the POST request + headers = headers_for_direct_upload(destination_encryption_key, encryption_key: destination_encryption_key) + + content_type ||= "binary/octet-stream" + filename ||= ActiveStorage::Filename.new(destination_key) + disposition ||= "inline" + expires_in = 30.seconds.to_i + + signed_url_params = signed_url_parameters = signed_url_parameters(disposition, filename, content_type, destination_encryption_key, expires_in) + uploader = ResumableUpload.new(file_for_destination, headers: headers, **signed_url_params) + + uploader.stream do |destination| + destination.binmode + source_keys.zip(source_encryption_keys).each do |(source_key, source_encryption_key)| + stream(source_key, encryption_key: derive_service_encryption_key(source_encryption_key)) do |chunk| + destination.write(chunk) + end + end + end + end + def download(key, encryption_key: nil, &block) if block_given? instrument :streaming_download, key: key do @@ -109,31 +144,29 @@ def stream(key, encryption_key: nil) end end - def compose(source_keys, destination_key, encryption_key:, filename: nil, content_type: nil, disposition: nil, custom_metadata: {}) - # Because we will always have a different encryption_key on a blob when created and google requires us to have the same encryption_keys on all source blobs - # we need to work this out a bit more. For now we don't need this and thus won't support it in this service. - raise NotImplementedError, "Currently composing files is not supported" - end - private - def private_url(key, expires_in:, filename:, content_type:, disposition:, encryption_key:, **remaining_options_for_streaming_url) - if private_url_policy == :require_headers - args = { - expires: expires_in, - query: { - "response-content-disposition" => content_disposition_with(type: disposition, filename: filename), - "response-content-type" => content_type - }, - headers: gcs_encryption_key_headers(derive_service_encryption_key(encryption_key)) - } + def signed_url_parameters(disposition, filename, content_type, encryption_key, expires_in) + args = { + expires: expires_in, + query: { + "response-content-disposition" => content_disposition_with(type: disposition, filename: filename), + "response-content-type" => content_type + }, + headers: gcs_encryption_key_headers(derive_service_encryption_key(encryption_key)) + } - if @config[:iam] - args[:issuer] = issuer - args[:signer] = signer - end + if @config[:iam] + args[:issuer] = issuer + args[:signer] = signer + end + args + end - file_for(key).signed_url(**args, version: :v4) + def private_url(key, expires_in:, filename:, content_type:, disposition:, encryption_key:, **remaining_options_for_streaming_url) + if private_url_policy == :require_headers + signed_url_parameters = signed_url_parameters(disposition, filename, content_type, encryption_key, expires_in) + file_for(key).signed_url(**signed_url_parameters, version: :v4) else private_url_for_streaming_via_controller(key, expires_in:, filename:, content_type:, disposition:, encryption_key:, **remaining_options_for_streaming_url) end diff --git a/lib/active_storage_encryption/encrypted_gcs_service/resumable_upload.rb b/lib/active_storage_encryption/encrypted_gcs_service/resumable_upload.rb new file mode 100644 index 0000000..862d138 --- /dev/null +++ b/lib/active_storage_encryption/encrypted_gcs_service/resumable_upload.rb @@ -0,0 +1,205 @@ +# frozen_string_literal: true + +# Unlike the AWS SDKs, the Ruby GCP SDKs do not have a built-in resumable upload feature, while that +# feature is well-supported by GCP (and has been supported for a long while). This module provides +# resumable uploads in an IO-like package, giving you an object you can write to. +# +# file = @bucket.file("upload.bin", skip_lookup: true) +# upload = ActiveStorageEncryption::ResumableGCSUpload.new(file) +# upload.stream do |io| +# io.write("Hello resumable") +# 20.times { io.write(Random.bytes(1.megabyte)) } +# end +# +# Note that to perform the resumable upload your IAM identity or machine identity must have either +# a correct key for accessing Cloud Storage, or - alternatively - run under a service account +# that is permitted to sign blobs. This maps to the "iam.serviceAccountTokenCreator" role - +# see https://github.com/googleapis/google-cloud-ruby/issues/13307 and https://cloud.google.com/iam/docs/service-account-permissions +class ActiveStorageEncryption::EncryptedGCSService::ResumableUpload + # AWS recommend 5MB as the default part size for multipart uploads. GCP recommend doing "less requests" + # in general, and they mandate that all parts except last are a multile of 256*1024. Knowing that we will + # need to hold a buffer of that size, let's just assume that the 5MB that AWS uses is a good number for part size. + CHUNK_SIZE_FOR_UPLOADS = 5 * 1024 * 1024 + + class UploadStartRefused < StandardError + end + + # When doing GCP uploads the chunks need to be sized to 256KB increments, and the output + # that we generate is not guaranteed to be chopped up this way. Also the upload for the last + # chunk is done slightly different than the preceding chunks. It is convenient to have a + # way to "chop up" an arbitrary streaming output into evenly sized chunks. + class ByteChunker + # @param chunk_size[Integer] the chunk size that all the chunks except the last one must have + # @delivery_proc the proc that will receive the bytes and the `is_last` boolean to indicate the last chunk + def initialize(chunk_size: 256 * 1024, &delivery_proc) + @chunk_size = chunk_size.to_i + # Use a fixed-capacity String instead of a StringIO since there are some advantages + # to mutable strings, if a string can be reused this saves memory + @buf_str = String.new(encoding: Encoding::BINARY, capacity: @chunk_size * 2) + @delivery_proc = delivery_proc.to_proc + end + + # Appends data to the buffer. Once the size of the chunk has been exceeded, a precisely-sized + # chunk will be passed to the `delivery_proc` + # + # @param bin_str[String] string in binary encoding + # @return self + def <<(bin_str) + @buf_str << bin_str.b + deliver_buf_in_chunks + self + end + + # Appends data to the buffer. Once the size of the chunk has been exceeded, a precisely-sized + # chunk will be passed to the `delivery_proc` + # + # @param bin_str[String] string in binary encoding + # @return [Integer] number of bytes appended to the buffer + def write(bin_str) + self << bin_str + bin_str.bytesize + end + + # Sends the last chunk to the `delivery_proc` even if there is nothing output - + # the last request will usually be needed to close the file + # + # @return void + def finish + deliver_buf_in_chunks + @delivery_proc.call(@buf_str, _is_last_chunk = true) + nil + end + + private def deliver_buf_in_chunks + while @buf_str.bytesize > @chunk_size + @delivery_proc.call(@buf_str[0...@chunk_size], _is_last_chunk = false) + @buf_str.replace(@buf_str[@chunk_size..]) + end + end + end + + # Largely inspired by https://gist.github.com/frankyn/9a5344d1b19ed50ebbf9f15f0ff92032 + # Acts like a writable object that you send data into. The object will split the data + # you send into chunks and send it to GCP cloud storage, you do not need to indicate + # the size of the output in advance. You do need to close the object to deliver the + # last chunk + class RangedPutIO + extend Forwardable + def_delegators :@chunker, :write, :finish, :<< + + # The chunks have to be sized in multiples of 256 kibibytes or 262,144 bytes + CHUNK_SIZE_UNIT = 256 * 1024 + + def initialize(put_url, chunk_size:, content_type: "binary/octet-stream") + raise ArgumentError, "chunk_size of #{chunk_size} is not a multiple of #{CHUNK_SIZE_UNIT}" unless (chunk_size % CHUNK_SIZE_UNIT).zero? + + @put_uri = URI(put_url) + @last_byte = 0 + @total_bytes = 0 + @content_type = content_type + @chunker = ByteChunker.new(chunk_size: chunk_size) { |bytes, is_last| upload_chunk(bytes, is_last) } + end + + private + + def upload_chunk(chunk, is_last) + @total_bytes += chunk.bytesize + content_range = if is_last + "bytes #{@last_byte}-#{@last_byte + chunk.bytesize - 1}/#{@total_bytes}" + else + "bytes #{@last_byte}-#{@last_byte + chunk.bytesize - 1}/*" + end + @last_byte += chunk.bytesize + + headers = { + "Content-Length" => chunk.bytesize.to_s, + "Content-Range" => content_range, + "Content-Type" => @content_type, + "Content-MD5" => Digest::MD5.base64digest(chunk) # This is to early flag bugs like the one mentioned below with httpx + } + + # Use plain old Net::HTTP here since currently version 1.4.0 of HTTPX (which is used by Faraday in our env) mangles up the file bytes before upload. + # when passing a File object directly. + # See https://cheddar-me.slack.com/archives/C01FEPX7PA9/p1739290056637849 + # and https://gitlab.com/os85/httpx/-/issues/338 + put_response = Net::HTTP.put(@put_uri, chunk, headers) + + # This is weird (from https://cloud.google.com/storage/docs/performing-resumable-uploads#resume-upload): + # Repeat the above steps for each remaining chunk of data that you want to upload, using the upper + # value contained in the Range header of each response to determine where to start each successive + # chunk; you should not assume that the server received all bytes sent in any given request. + # So in theory we must check that the "Range:" header in the response is "bytes=0-{@last_byte + chunk.bytesize - 1}" + # and we will add that soon. + # + # 308 means "intermediate chunk uploaded", 200 means "last chunk uploaded" + return if [308, 200].include?(put_response.code.to_i) + + raise "The PUT for the resumable upload responded with status #{put_response.code}, headers #{put_response.to_hash.inspect}" + end + end + + # @param [Google::Cloud::Storage::File] + def initialize(file, content_type: "binary/octet-stream", headers: {}, **signed_url_options) + @file = file + @content_type = content_type + @signed_url_options = signed_url_options # url_issuer_and_signer.merge(signed_url_options) + @resumable_upload_start_headers = headers.to_h + end + + # @yields writable[IO] an IO-ish object that responds to `#write` + def stream(&blk) + headers = {"x-goog-resumable": "start"}.merge(@resumable_upload_start_headers) + session_start_url = @file.signed_url(method: "POST", content_type: @content_type, headers: headers, **@signed_url_options) + response = Net::HTTP.post(URI(session_start_url), "", {"content-type" => @content_type, "x-goog-resumable" => "start"}) + unless response.code.to_i == 201 + raise UploadStartRefused, <<~MSG + Resumable upload start POST responded with #{response.code} instead of 201. + Body: + #{response.body} + MSG + end + + resumable_upload_session_put_url = response["location"] + writable = RangedPutIO.new(resumable_upload_session_put_url, content_type: @content_type, chunk_size: CHUNK_SIZE_FOR_UPLOADS) + yield(writable) + writable.finish + end + + private + + # This is gnarly. It is needed to allow service accounts (workload identity) to sign + # blobs - which is needed to sign a presigned POST URL. The presigned POST URL allows us + # to initiate a resumable upload. + # + # Comes from here: + # https://github.com/googleapis/google-cloud-ruby/issues/13307#issuecomment-1894546343 + def url_issuer_and_signer + env = Google::Cloud.env + if env.compute_engine? + # Issuer is the service account email that the Signed URL will be signed with + # and any permission granted in the Signed URL must be granted to the + # Google Service Account. + issuer = env.lookup_metadata "instance", "service-accounts/default/email" + + # Create a lambda that accepts the string_to_sign + signer = lambda do |string_to_sign| + iam_client = Google::Apis::IamcredentialsV1::IAMCredentialsService.new + + # Get the environment configured authorization + scopes = ["https://www.googleapis.com/auth/iam"] + iam_client.authorization = Google::Auth.get_application_default scopes + + request = Google::Apis::IamcredentialsV1::SignBlobRequest.new( + payload: string_to_sign + ) + resource = "projects/-/serviceAccounts/#{issuer}" + response = iam_client.sign_service_account_blob(resource, request) + response.signed_blob + end + + {issuer:, signer:} + else + {} + end + end +end diff --git a/test/lib/encrypted_gcs_service/byte_chunker_test.rb b/test/lib/encrypted_gcs_service/byte_chunker_test.rb new file mode 100644 index 0000000..5b36ad5 --- /dev/null +++ b/test/lib/encrypted_gcs_service/byte_chunker_test.rb @@ -0,0 +1,112 @@ +require "test_helper" + +class ActiveStorageEncryptionByteChunkerTest < ActiveSupport::TestCase + def chunker_class + ActiveStorageEncryption::EncryptedGCSService::ByteChunker + end + + test "outputs chunks with arbitrary chunk size" do + rng = Random.new(Minitest.seed) + + 32.times do + chunk_size = rng.rand(1..512) + + last_chunk_flags = [] + out_buf = StringIO.new + out_buf.binmode + + chunker = chunker_class.new(chunk_size:) do |bytes, is_last_chunk| + out_buf << bytes + last_chunk_flags << is_last_chunk + end + + blob = rng.bytes(rng.rand(1..1024)) + read_size = rng.rand(1..222) + + source_buf = StringIO.new(blob) + while (bytes = source_buf.read(read_size)) + chunker << bytes + end + chunker.finish + + assert_equal blob, out_buf.string + + *all_chunks_except_last, last_chunk_flag = last_chunk_flags + assert_equal [false], all_chunks_except_last.uniq if all_chunks_except_last.any? + assert_equal true, last_chunk_flag + end + end + + test "outputs chunks correctly when last write is at boundary" do + writes = [] + chunker = chunker_class.new(chunk_size: 3) do |chunk, is_last| + writes << chunk << is_last + end + + chunker << "a" + chunker << "b" + chunker << "c" + chunker.finish + + assert_equal ["abc", true], writes + end + + test "outputs chunks correctly when multiple chunks are required" do + writes = [] + chunker = chunker_class.new(chunk_size: 7) do |chunk, is_last| + writes << chunk << is_last + end + + ("a".."z").each do |char| + chunker << char + end + chunker.finish + + assert_equal ["abcdefg", false, "hijklmn", false, "opqrstu", false, "vwxyz", true], writes + end + + test "outputs chunks correctly when all the data is furnished in a single write" do + writes = [] + chunker = chunker_class.new(chunk_size: 7) do |chunk, is_last| + writes << chunk << is_last + end + + chunker << ("a".."z").to_a.join + chunker.finish + + assert_equal ["abcdefg", false, "hijklmn", false, "opqrstu", false, "vwxyz", true], writes + end + + test "outputs chunks correctly when only write is below chunk_size" do + writes = [] + chunker = chunker_class.new(chunk_size: 3) do |chunk, is_last| + writes << chunk << is_last + end + + chunker << "a" + chunker.finish + + assert_equal ["a", true], writes + end + + test "outputs a single zero-sized last chunk when finishing with a write of a few empty strings" do + writes = [] + chunker = chunker_class.new(chunk_size: 3) do |chunk, is_last| + writes << chunk << is_last + end + chunker << "" << "" + chunker.finish + + assert_equal ["", true], writes + end + + test "outputs a single zero-sized last chunk when finishing without writes" do + writes = [] + chunker = chunker_class.new(chunk_size: 3) do |chunk, is_last| + writes << chunk << is_last + end + chunker.finish + + assert_equal ["", true], writes + end +end diff --git a/test/lib/encrypted_gcs_service/resumable_upload_test.rb b/test/lib/encrypted_gcs_service/resumable_upload_test.rb new file mode 100644 index 0000000..c2a2f73 --- /dev/null +++ b/test/lib/encrypted_gcs_service/resumable_upload_test.rb @@ -0,0 +1,68 @@ +require "test_helper" + +class ActiveStorageEncryptionResumableGCSUploadTest < ActiveSupport::TestCase + setup do + if ENV["GOOGLE_APPLICATION_CREDENTIALS"].blank? + skip "You need GOOGLE_APPLICATION_CREDENTIALS set in your env and it needs to point to the JSON keyfile for GCS" + end + end + + def bucket + @bucket ||= begin + config = { + project_id: "sandbox-ci-25b8", + bucket: "sandbox-ci-testing-secure-documents", + private_url_policy: "stream" + } + service = ActiveStorageEncryption::EncryptedGCSService.new(**config) + service.send(:bucket) + end + end + + def random_filename + random_component = Random.bytes(4).unpack1("H*") + "test-upload-#{random_component}.bin" + end + + def test_performs_small_resumable_upload_which_is_below_chunk_threshold + test_gcp_file = bucket.file(random_filename, skip_lookup: true) + upload = ActiveStorageEncryption::EncryptedGCSService::ResumableUpload.new(test_gcp_file) + assert_nothing_raised do + upload.stream do |sink| + sink.write("Hello from a tiny resumable upload") + end + end + + # Wait for the file to get composed - this can take a little while after a resumable upload + loop do + break if test_gcp_file.exists? + sleep 0.1 + end + + readback = test_gcp_file.download.read + assert_equal "Hello from a tiny resumable upload", readback + end + + def test_performs_sizeable_resumable_upload + rng = Random.new(Minitest.seed) + test_gcp_file = bucket.file(random_filename, skip_lookup: true) + upload = ActiveStorageEncryption::EncryptedGCSService::ResumableUpload.new(test_gcp_file, content_type: "x-top-secret/binary") + assert_nothing_raised do + upload.stream do |sink| + 2.times do + sink.write(rng.bytes(5 * 1024 * 1024 + 1)) + end + end + end + + # Wait for the file to get composed - this can take a little while after a resumable upload + loop do + break if test_gcp_file.exists? + sleep 0.1 + end + + expected_size = (5 * 1024 * 1024 + 1) * 2 + assert_equal expected_size, test_gcp_file.size + assert_equal "x-top-secret/binary", test_gcp_file.content_type + end +end diff --git a/test/lib/encrypted_gcs_service_test.rb b/test/lib/encrypted_gcs_service_test.rb index 214001a..81357e9 100644 --- a/test/lib/encrypted_gcs_service_test.rb +++ b/test/lib/encrypted_gcs_service_test.rb @@ -151,6 +151,25 @@ def test_basic_gcs_readback assert_equal readback, plaintext_upload_bytes end + def test_compose + rng = Random.new(Minitest.seed) + + parts = 7.times.map do + {key: "#{run_id}-part-#{rng.hex(4)}", encryption_key: rng.bytes(32), bytes: rng.bytes(1024)} + end + + parts.each do |part| + @service.upload(part[:key], StringIO.new(part[:bytes]), encryption_key: part[:encryption_key]) + end + + source_keys = parts.map { |part| part[:key] } + source_encryption_keys = parts.map { |part| part[:encryption_key] } + destination_key = "#{run_id}-comp-#{rng.hex(4)}" + encryption_key = rng.bytes(32) + + @service.compose(source_keys, destination_key, source_encryption_keys:, encryption_key:) + end + def test_accepts_direct_upload_with_signature_and_headers rng = Random.new(Minitest.seed)