Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 70 additions & 80 deletions azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,66 +23,58 @@ import (
"path"
"time"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
)

const azureBlockSize = 4 * 1024 * 1024
const azureParallelism = 16

type azureBLOBStorage struct {
conf *AzureConfig
containerUrl azblob.ContainerURL
serviceUrl azblob.ServiceURL
conf *AzureConfig
client *azblob.Client
serviceUrl string
}

func NewAzure(conf *AzureConfig) (Storage, error) {
credential, err := azblob.NewSharedKeyCredential(
conf.AccountName,
conf.AccountKey,
)
cred, err := azblob.NewSharedKeyCredential(conf.AccountName, conf.AccountKey)
if err != nil {
return nil, err
}

pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: 5,
MaxRetryDelay: time.Second * 5,
},
})
serviceUrl := fmt.Sprintf("https://%s.blob.core.windows.net/", conf.AccountName)
client, err := azblob.NewClientWithSharedKeyCredential(serviceUrl, cred, nil)
if err != nil {
return nil, err
}

host := fmt.Sprintf("%s.blob.core.windows.net", conf.AccountName)
return &azureBLOBStorage{
conf: conf,
serviceUrl: azblob.NewServiceURL(url.URL{
Scheme: "https",
Host: host,
}, pipeline),
containerUrl: azblob.NewContainerURL(url.URL{
Scheme: "https",
Host: host,
Path: conf.ContainerName,
}, pipeline),
conf: conf,
client: client,
serviceUrl: serviceUrl,
}, nil
}

func (s *azureBLOBStorage) location(storagePath string) string {
return (&url.URL{
Scheme: "https",
Host: s.conf.AccountName + ".blob.core.windows.net",
Host: fmt.Sprintf("%s.blob.core.windows.net", s.conf.AccountName),
Path: path.Join(s.conf.ContainerName, storagePath),
}).String()
}

func (s *azureBLOBStorage) UploadData(data []byte, storagePath, contentType string) (string, int64, error) {
blobUrl := s.containerUrl.NewBlockBlobURL(storagePath)
_, err := azblob.UploadBufferToBlockBlob(context.Background(), data, blobUrl, azblob.UploadToBlockBlobOptions{
BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: contentType},
BlockSize: 4 * 1024 * 1024,
Parallelism: 16,
_, err := s.client.UploadBuffer(context.Background(), s.conf.ContainerName, storagePath, data, &azblob.UploadBufferOptions{
HTTPHeaders: &blob.HTTPHeaders{BlobContentType: &contentType},
BlockSize: azureBlockSize,
Concurrency: azureParallelism,
})
if err != nil {
return "", 0, err
}

return s.location(storagePath), int64(len(data)), nil
}

Expand All @@ -100,13 +92,10 @@ func (s *azureBLOBStorage) UploadFile(filepath, storagePath, contentType string)
return "", 0, err
}

// upload blocks in parallel for optimal performance
// it calls PutBlock/PutBlockList for files larger than 256 MBs and PutBlob for smaller files
blobUrl := s.containerUrl.NewBlockBlobURL(storagePath)
_, err = azblob.UploadFileToBlockBlob(context.Background(), file, blobUrl, azblob.UploadToBlockBlobOptions{
BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: contentType},
BlockSize: 4 * 1024 * 1024,
Parallelism: 16,
_, err = s.client.UploadFile(context.Background(), s.conf.ContainerName, storagePath, file, &azblob.UploadFileOptions{
HTTPHeaders: &blob.HTTPHeaders{BlobContentType: &contentType},
BlockSize: azureBlockSize,
Concurrency: azureParallelism,
})
if err != nil {
return "", 0, err
Expand All @@ -118,44 +107,45 @@ func (s *azureBLOBStorage) UploadFile(filepath, storagePath, contentType string)
func (s *azureBLOBStorage) ListObjects(prefix string) ([]string, error) {
var objects []string

for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := s.containerUrl.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{
Prefix: prefix,
})
pager := s.client.NewListBlobsFlatPager(s.conf.ContainerName, &azblob.ListBlobsFlatOptions{
Prefix: &prefix,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
return nil, err
}

marker = listBlob.NextMarker
for _, blobInfo := range listBlob.Segment.BlobItems {
objects = append(objects, blobInfo.Name)
for _, item := range page.Segment.BlobItems {
if item.Name != nil {
objects = append(objects, *item.Name)
}
}
}

return objects, nil
}

func (s *azureBLOBStorage) DownloadData(storagePath string) ([]byte, error) {
blobUrl := s.containerUrl.NewBlobURL(storagePath)
ctx := context.Background()
blobClient := s.client.ServiceClient().NewContainerClient(s.conf.ContainerName).NewBlobClient(storagePath)

props, err := blobUrl.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return nil, err
}
if props.ContentLength == nil {
return nil, errors.New("azure: missing content length")
}

b := make([]byte, props.ContentLength())
err = azblob.DownloadBlobToBuffer(context.Background(), blobUrl, 0, azblob.CountToEnd, b, azblob.DownloadFromBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16,
RetryReaderOptionsPerBlock: azblob.RetryReaderOptions{
MaxRetryRequests: 3,
},
buf := make([]byte, *props.ContentLength)
_, err = s.client.DownloadBuffer(ctx, s.conf.ContainerName, storagePath, buf, &azblob.DownloadBufferOptions{
BlockSize: azureBlockSize,
Concurrency: azureParallelism,
})
if err != nil {
return nil, err
}

return b, nil
return buf, nil
}

func (s *azureBLOBStorage) DownloadFile(filepath, storagePath string) (int64, error) {
Expand All @@ -165,13 +155,9 @@ func (s *azureBLOBStorage) DownloadFile(filepath, storagePath string) (int64, er
}
defer file.Close()

blobUrl := s.containerUrl.NewBlobURL(storagePath)
err = azblob.DownloadBlobToFile(context.Background(), blobUrl, 0, 0, file, azblob.DownloadFromBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16,
RetryReaderOptionsPerBlock: azblob.RetryReaderOptions{
MaxRetryRequests: 3,
},
_, err = s.client.DownloadFile(context.Background(), s.conf.ContainerName, storagePath, file, &azblob.DownloadFileOptions{
BlockSize: azureBlockSize,
Concurrency: azureParallelism,
})
if err != nil {
return 0, err
Expand All @@ -189,47 +175,51 @@ func (s *azureBLOBStorage) GeneratePresignedUrl(storagePath string, expiration t
return "", errors.New("OAuth required")
}

now := time.Now()
now := time.Now().UTC()
exp := now.Add(expiration)

serviceUrl := s.serviceUrl.WithPipeline(azblob.NewPipeline(s.conf.TokenCredential, azblob.PipelineOptions{}))
udc, err := serviceUrl.GetUserDelegationCredential(
context.Background(), azblob.NewKeyInfo(now, exp), nil, nil,
)
svcClient, err := service.NewClient(s.serviceUrl, s.conf.TokenCredential, nil)
if err != nil {
return "", err
}

udc, err := svcClient.GetUserDelegationCredential(context.Background(), service.KeyInfo{
Start: to.Ptr(now.Format(sas.TimeFormat)),
Expiry: to.Ptr(exp.Format(sas.TimeFormat)),
}, nil)
if err != nil {
return "", err
}

qp, err := azblob.BlobSASSignatureValues{
Protocol: azblob.SASProtocolHTTPS,
qp, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: now,
ExpiryTime: exp,
Permissions: azblob.AccountSASPermissions{Read: true}.String(),
Permissions: (&sas.BlobPermissions{Read: true}).String(),
ContainerName: s.conf.ContainerName,
BlobName: storagePath,
}.NewSASQueryParameters(udc)
}.SignWithUserDelegation(udc)
if err != nil {
return "", err
}

loc := &url.URL{
Scheme: "https",
Host: s.conf.AccountName + ".blob.core.windows.net",
Host: fmt.Sprintf("%s.blob.core.windows.net", s.conf.AccountName),
Path: path.Join(s.conf.ContainerName, storagePath),
RawQuery: qp.Encode(),
}
return loc.String(), nil
}

func (s *azureBLOBStorage) DeleteObject(storagePath string) error {
blobUrl := s.containerUrl.NewBlobURL(storagePath)
_, err := blobUrl.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
_, err := s.client.DeleteBlob(context.Background(), s.conf.ContainerName, storagePath, nil)
return err
}

func (s *azureBLOBStorage) DeleteObjects(storagePaths []string) error {
for _, path := range storagePaths {
if err := s.DeleteObject(path); err != nil {
for _, p := range storagePaths {
if err := s.DeleteObject(p); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package storage
import (
"time"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
)

type Config interface {
Expand All @@ -37,7 +37,7 @@ type AzureConfig struct {
AccountName string `yaml:"account_name,omitempty"` // (env AZURE_STORAGE_ACCOUNT)
AccountKey string `yaml:"account_key,omitempty"` // (env AZURE_STORAGE_KEY)
ContainerName string `yaml:"container_name,omitempty"`
TokenCredential azblob.TokenCredential `yaml:"-"` // required for presigned url generation
TokenCredential azcore.TokenCredential `yaml:"-"` // required for presigned url generation
}

func (c *AzureConfig) newStorage() (Storage, error) { return NewAzure(c) }
Expand Down
22 changes: 10 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
module github.com/livekit/storage

go 1.23.0

toolchain go1.24.1
go 1.25.0

require (
cloud.google.com/go/storage v1.55.0
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.7.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go-v2 v1.36.5
github.com/aws/aws-sdk-go-v2/config v1.29.17
Expand All @@ -18,7 +17,7 @@ require (
github.com/googleapis/gax-go/v2 v2.14.2
github.com/joho/godotenv v1.5.1
github.com/livekit/protocol v1.39.2
github.com/stretchr/testify v1.10.0
github.com/stretchr/testify v1.11.1
golang.org/x/oauth2 v0.30.0
google.golang.org/api v0.238.0
)
Expand All @@ -31,7 +30,7 @@ require (
cloud.google.com/go/compute/metadata v0.7.0 // indirect
cloud.google.com/go/iam v1.5.2 // indirect
cloud.google.com/go/monitoring v1.24.2 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.12.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 // indirect
Expand Down Expand Up @@ -60,7 +59,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
Expand All @@ -80,11 +78,11 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap/exp v0.3.0 // indirect
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/net v0.54.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect
golang.org/x/time v0.12.0 // indirect
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250512202823-5a2f75b736a9 // indirect
Expand Down
Loading