import { TransferProgressEvent } from '@azure/core-http';
import { PagedAsyncIterableIterator } from '@azure/core-paging';
import { BlobServiceClient, BlockBlobClient } from '@azure/storage-blob';
import { from, Observable, Subscriber } from 'rxjs';
import { distinctUntilChanged, scan, startWith } from 'rxjs/operators';
import {
    BlobContainerRequest,
    BlobFileRequest,
    BlobStorageRequest,
    IBlobStorageTransfer
} from '../types/azure-storage';

export enum BlobTransferStatus {
    NotStarted,
    Started,
    Completed
}

export class BlobStorageService {
    getContainers(request: BlobStorageRequest) {
        const blobServiceClient = this.buildClient(request);
        return this.asyncToObservable(blobServiceClient.listContainers());
    }

    listBlobsInContainer(request: BlobContainerRequest) {
        const containerClient = this.getContainerClient(request);
        return this.asyncToObservable(containerClient.listBlobsFlat());
    }

    downloadBlobItem(request: BlobFileRequest) {
        const blockBlobClient = this.getBlockBlobClient(request);
        return from(blockBlobClient.download());
    }

    deleteBlobItem(request: BlobFileRequest) {
        const blockBlobClient = this.getBlockBlobClient(request);
        return from(blockBlobClient.delete());
    }

    uploadToBlobStorage(file: File, request: BlobFileRequest, abortSignal: AbortSignal) {
        const blockBlobClient = this.getBlockBlobClient(request);
        return this.uploadFile(blockBlobClient, file, abortSignal);
    }

    private getBlockBlobClient(request: BlobFileRequest) {
        const containerClient = this.getContainerClient(request);
        return containerClient.getBlockBlobClient(request.filename);
    }

    private getContainerClient(request: BlobContainerRequest) {
        const blobServiceClient = this.buildClient(request);
        return blobServiceClient.getContainerClient(request.containerName);
    }

    private buildClient(options: BlobStorageRequest) {
        return BlobServiceClient.fromConnectionString(
            this.buildConnectionString(options)
        );
    }

    private uploadFile(blockBlobClient: BlockBlobClient, file: File, abortSignal: AbortSignal) {
        return new Observable<IBlobStorageTransfer>(observer => {
            blockBlobClient
                .uploadBrowserData(file, {
                    onProgress: this.onProgress(observer),
                    abortSignal: abortSignal,
                    conditions: {
                        ifNoneMatch: "*"
                    },
                    blobHTTPHeaders: {
                        blobContentType: file.type
                    }
                })
                .then(
                    this.onUploadComplete(observer, file),
                    this.onUploadError(observer)
                );
        }).pipe(distinctUntilChanged());
    }

    private onUploadError(observer: Subscriber<IBlobStorageTransfer>) {
        //return (error: any) => observer.error(error);
        return (error: any) => {
            observer.next({ bytesTransferred: 0, error: error, status: BlobTransferStatus.Started });
            observer.complete();
        };
    }

    private onUploadComplete(observer: Subscriber<IBlobStorageTransfer>, file: File) {
        return () => {
            observer.next({ bytesTransferred: file.size, status: BlobTransferStatus.Completed });
            observer.complete();
        };
    }

    private onProgress(observer: Subscriber<IBlobStorageTransfer>) {
        return (progress: TransferProgressEvent) =>
            observer.next({ bytesTransferred: progress.loadedBytes, status: BlobTransferStatus.Started });
    }

    private asyncToObservable<T, TService>(
        iterable: PagedAsyncIterableIterator<T, TService>
    ) {
        return new Observable<T>(
            observer =>
                void (async () => {
                    try {
                        for await (const item of iterable as AsyncIterable<T>) {
                            if (observer.closed) return;
                            observer.next(item);
                        }
                        observer.complete();
                    } catch (e) {
                        observer.error(e);
                    }
                })()
        ).pipe(
            scan<T, T[]>((items, item) => [...items, item], []),
            startWith([] as T[])
        );
    }

    private buildConnectionString = (options: BlobStorageRequest) => {
        return (
            `BlobEndpoint=${options.storageUri};` +
            `SharedAccessSignature=${options.storageAccessToken}`
        );
    };
}
