Wpis z mikrobloga

Oto ostateczna wersja kodu w TypeScript, co wysyła pliki na serwer w częściach. Co tu się dzieje:

1. W funkcji initUpload() wysyłamy do serwera metadane, aby zweryfikować, czy plik nie został wcześniej wgrany. Trzeba jeszcze dodać liczenie SHA-256, a z tym mam trochę obawy, bo film nagrany telefonem może ważyć 2 GB, a wrzucenie całego pliku do pamięci, aby skorzystać z Crypto API (bo chyba strumieniowo się nie da), spowoduje wykrzaczenie się przeglądarki.

2. Następnie dodajemy plik do kolejki i przesyłamy pierwszy fragment pliku.

3. Operator expand() to rekurencja w RxJS. Z tym najdłużej się zmagałem, gdyż kod się zapętlał. Otóż expand() wykonuje się zarówno kiedy dostanie wartość na wejściu, jak i na wyjściu. A jeśli w expand() zwrócimy wartość inną niż EMPTY (która teoretycznie kończy strumień, lecz czeka na zakończenie strumienia na wyjściu (this.api.patchBytes). Szukałem takiego operatora, który zawsze czeka na zakończenie strumienia na wyjściu lub na jasny sygnał, że ma nastąpić rekurencja, ale nie znalazłem. Po wielu próbach udało się zapobiec nieskończonej rekurencji i innym niepożądanym skutkom (np. żądanie HTTP z ostatnim kawałkiem pliku było przerywane).

Wydajnościowo na słabszych urządzeniach ten kod może klęknąć, bo RxJS swój narzut dodaje.

Wiecie, co teraz zrobię? Wywalę podejście reaktywne (po co mi to było, diabeł mnie skusił) i przywrócę podejście imperatywne. Z głównej funkcji zwrócę jakiś uchwyt, który pozwoli wstrzymać, zatrzymać i kontynuować upload pliku. Obecnie możliwe jest tylko całkowite zatrzymanie poprzez unsubscribe() (jeszcze nie testowałem, ale mam nadzieję, że zadziała).

Na końcu i tak wleci serwer CDN i kod pójdzie do zaorania. Ech...

#programowanie #rxjs #angular

private initAndQueue(initUrl: string, file: File, id?: string): Observable<UploadProgress> {
return this.initUpload(initUrl, file, id).pipe(
switchMap(result => {
if (isAlreadyUploaded(result)) {
return of({ status: 'completed', bytes: file.size, total: file.size, result } as UploadProgress);
}
return this.uploadQueue$.pipe(
startWith(file),
filter(queuedFile => queuedFile === file),
switchMap(() => this.uploadChunks(`upload/${result.id}`, file, result.offset)),
map(progress => {
result.offset = progress.bytes;
return { ...progress, result };
})
);
})
);
}

private initUpload(initUrl: string, file: File, id?: string): Observable<Attachment> {
return this.api.post<Attachment>(initUrl, fileToAttachment(file, id));
}

private uploadChunks(url: string, file: File, beginFrom = 0): Observable<UploadProgress> {
return of({ status: 'uploading', bytes: beginFrom, total: file.size, chunkBytes: 0 } as UploadProgress).pipe(
filter(progress => !progress.chunkBytes),
distinctUntilChanged((previous, current) => previous.bytes !== current.bytes && current.bytes > 0),
expand(progress => {
if (!!progress.chunkBytes || progress.bytes >= file.size) {
return EMPTY;
}
const fileSlice = file.slice(progress.bytes, Math.min(progress.bytes + config.chunkSize, file.size));
const chunkSize = fileSlice.size;
return from(fileSlice.arrayBuffer()).pipe(
switchMap(chunk => {
return this.api.patchBytes(url, chunk, progress.bytes, chunkSize);
}),
filter(event => event.type === HttpEventType.Response || event.type === HttpEventType.UploadProgress),
map(event => {
if (event.type === HttpEventType.Response) {
return {
status: 'uploading',
bytes: progress.bytes + (event.loaded || chunkSize) - (progress.chunkBytes || 0),
total: file.size,
chunkBytes: 0,
} as UploadProgress;
} else {
return {
status: 'uploading',
bytes: progress.bytes + (event.loaded || 0) - (progress.chunkBytes || 0),
total: file.size,
chunkBytes: event.loaded || 0,
} as UploadProgress;
}
})
);
}),
takeWhile(progress => progress.bytes < file.size || !!progress.chunkBytes, true),
endWith({ status: 'completed', bytes: file.size, total: file.size } as UploadProgress)
);
}

// A tu funkcja z serwisu API
patchBytes(url: string, body: ArrayBuffer, offset: number, size: number): Observable<any> {
const resourceUrl = environment.servicesUrl + '/' + url;
const headers = new HttpHeaders({ 'Content-Type': 'application/octet-stream' });
return this.http.patch(resourceUrl, body, {
headers,
params: { offset, size },
reportProgress: true,
observe: 'events',
});
}
  • Odpowiedz
  • Otrzymuj powiadomienia
    o nowych komentarzach