Fiszki Online RxJS (Preview)
Darmowy podgląd 15 z 40 dostępnych pytań
Podstawy RxJS
Czym jest RxJS i jakie problemy rozwiązuje w aplikacjach JavaScript?
Odpowiedź w 30 sekund: RxJS (Reactive Extensions for JavaScript) to biblioteka do programowania reaktywnego wykorzystująca strumienie danych (Observable). Rozwiązuje problemy związane z zarządzaniem asynchronicznością, złożonymi przepływami danych i koordynacją wielu źródeł zdarzeń w aplikacjach JavaScript.
Odpowiedź w 2 minuty: RxJS to potężna biblioteka implementująca wzorzec Observer i programowanie reaktywne w JavaScript. Pozwala traktować wszystkie asynchroniczne operacje - od kliknięć użytkownika, przez żądania HTTP, po timery - jako ujednolicone strumienie danych, które można komponować, transformować i łączyć.
Główne problemy, które rozwiązuje RxJS to: zarządzanie callback hell poprzez deklaratywne operatory, łatwe anulowanie asynchronicznych operacji (subscriptions), elegancka obsługa błędów w złożonych przepływach danych oraz zaawansowane scenariusze jak debouncing, throttling, retry logic czy łączenie wielu źródeł danych. RxJS jest szczególnie popularna w ekosystemie Angular, ale sprawdza się w każdej aplikacji JavaScript wymagającej zaawansowanego zarządzania asynchronicznością.
Biblioteka oferuje ponad 100 operatorów pozwalających na transformację, filtrowanie, łączenie i kontrolę strumieni danych w sposób funkcyjny i deklaratywny. Dzięki temu kod staje się bardziej czytelny, testowalny i łatwiejszy w utrzymaniu niż tradycyjne podejścia z callbackami czy Promise.
Przykład kodu:
import { fromEvent } from 'rxjs';
import { debounceTime, map, distinctUntilChanged } from 'rxjs/operators';
// Pole wyszukiwania z opóźnionym wysyłaniem zapytań
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input')
.pipe(
map((event: any) => event.target.value), // Wyciągnij wartość
debounceTime(300), // Czekaj 300ms po ostatnim wpisie
distinctUntilChanged() // Ignoruj jeśli wartość się nie zmieniła
)
.subscribe(searchTerm => {
console.log('Szukam:', searchTerm);
// Tutaj wywołaj API z wyszukiwaniem
});
graph LR
A[Zdarzenia input] --> B[map - wyciągnij wartość]
B --> C[debounceTime - opóźnienie]
C --> D[distinctUntilChanged - unikalne]
D --> E[subscribe - wywołaj API]
Materiały
- Oficjalna dokumentacja RxJS
- RxJS Marbles - interaktywne diagramy
- Learn RxJS - przewodnik z przykładami
Czym jest Observable i czym różni się od Promise?
Odpowiedź w 30 sekund: Observable to strumień danych, który może emitować wiele wartości w czasie, obsługuje leniwą ewaluację i pozwala na anulowanie. Promise reprezentuje pojedynczą wartość asynchroniczną, wykonuje się od razu i nie można go anulować.
Odpowiedź w 2 minuty: Observable to podstawowy typ w RxJS reprezentujący strumień danych, który może emitować zero, jedną lub wiele wartości w czasie. Observable jest "leniwy" (lazy) - kod wewnątrz nie wykonuje się dopóki ktoś się nie zasubskrybuje. Każda subskrypcja tworzy niezależne wykonanie (dla cold Observable).
Kluczowe różnice względem Promise:
- Ilość wartości: Promise zwraca dokładnie jedną wartość (lub błąd), Observable może emitować wiele wartości przez cały okres życia
- Ewaluacja: Promise rozpoczyna wykonanie natychmiast po utworzeniu (eager), Observable tylko po subskrypcji (lazy)
- Anulowanie: Promise nie może być anulowany po rozpoczęciu, Observable można w każdej chwili anulować poprzez unsubscribe
- Operatory: Observable oferuje bogaty zestaw operatorów do transformacji (map, filter, merge itp.), Promise ma tylko then/catch/finally
Observable świetnie sprawdza się do obsługi WebSocketów, zdarzeń UI, intervalów czy dowolnych strumieni danych zmieniających się w czasie. Promise lepiej pasuje do pojedynczych operacji HTTP gdzie potrzebujesz tylko jednej odpowiedzi.
Przykład kodu:
// PROMISE - wykonuje się natychmiast, jedna wartość
const promise = new Promise((resolve) => {
console.log('Promise wykonany!'); // Loguje się od razu
setTimeout(() => resolve('Wynik'), 1000);
});
promise.then(value => console.log(value));
// Nie można anulować!
// OBSERVABLE - leniwy, wiele wartości, można anulować
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
console.log('Observable wykonany!'); // Loguje się tylko po subscribe
let count = 0;
const interval = setInterval(() => {
subscriber.next(count++); // Emituj wiele wartości
}, 1000);
// Funkcja czyszcząca - wykonana przy unsubscribe
return () => {
console.log('Anulowano!');
clearInterval(interval);
};
});
const subscription = observable.subscribe(value => console.log(value));
// Anuluj po 5 sekundach
setTimeout(() => subscription.unsubscribe(), 5000);
graph TD
A[Promise] --> B[Eager - wykonuje się od razu]
A --> C[Jedna wartość]
A --> D[Nie można anulować]
E[Observable] --> F[Lazy - wykonuje się po subscribe]
E --> G[Wiele wartości w czasie]
E --> H[Można anulować - unsubscribe]
Materiały
↑ Powrót na góręJak działa subskrypcja (Subscription) i dlaczego ważne jest jej anulowanie?
Odpowiedź w 30 sekund: Subscription reprezentuje wykonanie Observable i zwraca obiekt z metodą unsubscribe(). Anulowanie subskrypcji jest kluczowe aby uniknąć wycieków pamięci, niepotrzebnych operacji i błędów w aplikacji gdy komponenty/obiekty przestają istnieć.
Odpowiedź w 2 minuty: Subscription to obiekt zwracany przez metodę subscribe() reprezentujący trwające wykonanie Observable. Zawiera metodę unsubscribe(), która pozwala przerwać strumień danych i wykonać cleanup. Gdy wywołasz subscribe(), Observable rozpoczyna emitowanie wartości do przekazanych funkcji callback (next, error, complete).
Anulowanie subskrypcji jest krytyczne z kilku powodów:
- Wycieki pamięci: Nieodsubskrybowane Observable (np. interwały, WebSockety) działają w tle zajmując pamięć nawet gdy nie są już potrzebne
- Niepotrzebne operacje: Zapytania HTTP, timery czy event listenery działają niepotrzebnie zużywając zasoby
- Błędy w aplikacji: Aktualizacje usuniętych komponentów (w React/Angular) prowadzą do błędów "cannot set state of unmounted component"
- Problemy z wydajnością: Wielokrotne subskrypcje bez cleanup mogą drastycznie spowolnić aplikację
W Angular możesz użyć AsyncPipe który automatycznie anuluje subskrypcje. W innych frameworkach używaj wzorca przechowywania subskrypcji i anulowania ich w lifecycle hooks (useEffect cleanup, ngOnDestroy, componentWillUnmount).
Przykład kodu:
import { interval, Subscription } from 'rxjs';
// Pojedyncza subskrypcja
const subscription = interval(1000).subscribe(value => {
console.log('Timer:', value);
});
// Anuluj po 5 sekundach
setTimeout(() => {
subscription.unsubscribe();
console.log('Subskrypcja anulowana');
}, 5000);
// Zarządzanie wieloma subskrypcjami
class UserDashboard {
private subscriptions = new Subscription();
init() {
// Dodaj wiele subskrypcji do jednego kontenera
this.subscriptions.add(
interval(1000).subscribe(x => console.log('Timer 1:', x))
);
this.subscriptions.add(
interval(2000).subscribe(x => console.log('Timer 2:', x))
);
}
destroy() {
// Anuluj wszystkie subskrypcje jednocześnie
this.subscriptions.unsubscribe();
console.log('Wszystkie subskrypcje anulowane');
}
}
// Przykład z React Hook
import { useEffect } from 'react';
function MyComponent() {
useEffect(() => {
const subscription = interval(1000).subscribe(x => {
console.log('Wartość:', x);
});
// Cleanup - wykonany gdy komponent się odmontuje
return () => subscription.unsubscribe();
}, []);
return <div>Komponent z Observable</div>;
}
sequenceDiagram
participant Component
participant Observable
participant Subscription
Component->>Observable: subscribe()
Observable->>Subscription: return Subscription
Observable->>Component: next(value1)
Observable->>Component: next(value2)
Component->>Subscription: unsubscribe()
Subscription->>Observable: cleanup()
Note over Observable: Zatrzymuje emitowanie
Materiały
↑ Powrót na góręCzym jest Observer i jakie metody udostępnia (next, error, complete)?
Odpowiedź w 30 sekund: Observer to obiekt z metodami callback definiującymi jak reagować na emitowane wartości. Zawiera trzy metody: next(value) - obsługa nowej wartości, error(err) - obsługa błędu, complete() - zakończenie strumienia.
Odpowiedź w 2 minuty: Observer to obiekt implementujący interfejs z trzema opcjonalnymi metodami callback, które określają jak konsument reaguje na notyfikacje z Observable. Jest to implementacja wzorca Observer, gdzie Observable to subject (podmiot), a Observer to obserwator reagujący na zmiany.
Trzy metody Observer:
- next(value): Wywoływana za każdym razem gdy Observable emituje wartość. Może być wywołana 0 lub więcej razy. To główna metoda obsługująca strumień danych.
- error(error): Wywoływana gdy wystąpi błąd w Observable. Po wywołaniu error strumień się kończy i nie będzie więcej emisji (ani next, ani complete). Wywołana maksymalnie raz.
- complete(): Sygnalizuje zakończenie strumienia bez błędu. Po complete nie będzie już żadnych emisji. Wywołana maksymalnie raz. Niektóre Observable nigdy się nie kończą (np. interval).
Możesz przekazać Observer jako obiekt lub jako osobne funkcje do subscribe(). Wszystkie metody są opcjonalne - możesz np. obsłużyć tylko next ignorując error i complete.
Przykład kodu:
import { Observable } from 'rxjs';
// Observable emitujący wartości
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
// Symulacja błędu (odkomentuj aby przetestować)
// subscriber.error(new Error('Coś poszło nie tak!'));
subscriber.complete();
// To nie zostanie wysłane - po complete nic nie emituje
subscriber.next(4);
});
// Observer jako obiekt
const observer = {
next: (value: number) => {
console.log('Otrzymano wartość:', value);
},
error: (err: Error) => {
console.error('Wystąpił błąd:', err.message);
},
complete: () => {
console.log('Strumień zakończony!');
}
};
observable.subscribe(observer);
// Alternatywnie - przekaż funkcje bezpośrednio
observable.subscribe(
value => console.log('Next:', value), // next
err => console.error('Error:', err), // error
() => console.log('Complete!') // complete
);
// Lub tylko next (pozostałe opcjonalne)
observable.subscribe(value => console.log(value));
// Praktyczny przykład - zapytanie HTTP
import { ajax } from 'rxjs/ajax';
ajax.getJSON('https://api.example.com/users').subscribe({
next: users => {
console.log('Pobrano użytkowników:', users);
// Zaktualizuj UI
},
error: err => {
console.error('Błąd pobierania:', err);
// Pokaż komunikat błędu użytkownikowi
},
complete: () => {
console.log('Zapytanie zakończone');
// Ukryj loader
}
});
stateDiagram-v2
[*] --> Active: subscribe()
Active --> Active: next(value)
Active --> Error: error(err)
Active --> Complete: complete()
Error --> [*]
Complete --> [*]
note right of Active
Może emitować wiele
wartości przez next()
end note
note right of Error
Terminal state
Kończy strumień
end note
note right of Complete
Terminal state
Kończy strumień
end note
Materiały
↑ Powrót na góręJaka jest różnica między cold i hot Observable?
Odpowiedź w 30 sekund: Cold Observable tworzy nowe, niezależne wykonanie dla każdej subskrypcji (unicast) - każdy subscriber dostaje własny strumień od początku. Hot Observable współdzieli jedno wykonanie między wszystkich subscribers (multicast) - wszyscy dostają te same wartości w tym samym czasie.
Odpowiedź w 2 minuty: Cold i Hot Observable różnią się sposobem tworzenia i współdzielenia strumienia danych między subskrybentami.
Cold Observable (unicast):
- Tworzy nowe wykonanie dla każdej subskrypcji
- Każdy subscriber otrzymuje własną, niezależną kopię wartości
- Strumień rozpoczyna się od początku dla każdego nowego subscribera
- Przykłady: HTTP requests, timery, of(), from(), interval()
- Producent danych jest tworzony wewnątrz Observable
Hot Observable (multicast):
- Współdzieli jedno wykonanie między wszystkich subscribers
- Wszyscy subskrybenci otrzymują te same wartości w tym samym czasie
- Nowy subscriber otrzymuje tylko wartości emitowane PO jego subskrypcji
- Przykłady: DOM events, WebSocket streams, Subject
- Producent danych istnieje niezależnie od Observable
Możesz przekształcić Cold Observable w Hot używając operatorów share(), shareReplay() lub Subject. Jest to przydatne gdy chcesz uniknąć wielokrotnego wykonywania kosztownych operacji (np. HTTP requests) dla każdego subscribera.
Przykład kodu:
import { Observable, interval } from 'rxjs';
import { share, take } from 'rxjs/operators';
// COLD OBSERVABLE - każdy subscriber dostaje własny strumień
const coldObservable = new Observable(subscriber => {
console.log('Tworzę nowego producenta!');
const random = Math.random();
subscriber.next(random);
subscriber.complete();
});
console.log('=== COLD OBSERVABLE ===');
coldObservable.subscribe(x => console.log('Subscriber 1:', x));
coldObservable.subscribe(x => console.log('Subscriber 2:', x));
// Wyświetli dwie różne losowe liczby - każdy ma własny strumień
// HOT OBSERVABLE - wszyscy współdzielą ten sam strumień
const hotObservable = coldObservable.pipe(share());
console.log('=== HOT OBSERVABLE ===');
hotObservable.subscribe(x => console.log('Subscriber 1:', x));
hotObservable.subscribe(x => console.log('Subscriber 2:', x));
// Wyświetli tę samą losową liczbę - współdzielony strumień
// Praktyczny przykład - HTTP request
import { ajax } from 'rxjs/ajax';
import { shareReplay } from 'rxjs/operators';
// Cold - każdy subscribe wywoła nowe HTTP request
const coldHttp = ajax.getJSON('https://api.example.com/users');
coldHttp.subscribe(users => console.log('Request 1:', users.length));
coldHttp.subscribe(users => console.log('Request 2:', users.length));
// Wysłane 2 oddzielne requesty!
// Hot z cache - jeden request, współdzielony wynik
const hotHttp = ajax.getJSON('https://api.example.com/users').pipe(
shareReplay(1) // Cache ostatniej wartości dla nowych subscribers
);
hotHttp.subscribe(users => console.log('Request 1:', users.length));
hotHttp.subscribe(users => console.log('Request 2:', users.length));
// Tylko 1 request! Drugi subscriber dostaje cache'owaną wartość
// Subject - naturalnie hot
import { Subject } from 'rxjs';
const subject = new Subject();
subject.subscribe(x => console.log('Sub 1:', x));
subject.next(1); // Oba dostaną 1
subject.subscribe(x => console.log('Sub 2:', x));
subject.next(2); // Oba dostaną 2
subject.next(3); // Oba dostaną 3
graph TD
subgraph "Cold Observable - Unicast"
A[Observable] --> B[Subscribe 1]
A --> C[Subscribe 2]
B --> D[Wykonanie 1: values 1,2,3]
C --> E[Wykonanie 2: values 1,2,3]
end
subgraph "Hot Observable - Multicast"
F[Observable] --> G[Wspólne wykonanie]
G --> H[Subscribe 1]
G --> I[Subscribe 2]
H --> J[Dostaje: values 1,2,3]
I --> K[Dostaje: values 2,3 - po subskrypcji]
end
Materiały
- Hot vs Cold Observables - RxJS
- Understanding Hot vs Cold - Medium
- Multicasting operators - Learn RxJS
Operatory Transformacji RxJS
Jak działa operator map() i kiedy go używać?
Odpowiedź w 30 sekund:
Operator map() transformuje każdą wartość emitowaną przez Observable, stosując do niej podaną funkcję. Jest to odpowiednik metody Array.map() w RxJS - pobiera wartość, przekształca ją i emituje wynik.
Odpowiedź w 2 minuty:
Operator map() jest jednym z najbardziej podstawowych i najczęściej używanych operatorów transformacji w RxJS. Działa synchronicznie - dla każdej wartości emitowanej przez źródłowy Observable, map() natychmiast aplikuje podaną funkcję transformacji i emituje wynik.
Używamy map() gdy potrzebujemy przekształcić dane "jeden do jednego" - każda wartość wejściowa produkuje dokładnie jedną wartość wyjściową. Typowe przypadki użycia to: wyciąganie konkretnych pól z obiektów, konwersja typów danych, formatowanie wartości, wykonywanie obliczeń na wartościach czy mapowanie odpowiedzi HTTP do modeli domenowych.
Kluczowa różnica w porównaniu do operatorów wyższego rzędu (jak mergeMap, switchMap) polega na tym, że funkcja przekazana do map() musi zwracać zwykłą wartość, a nie Observable. Jeśli zwrócimy Observable, otrzymamy Observable zagnieżdżony wewnątrz Observable, co zazwyczaj nie jest tym czego chcemy.
Operator map() zachowuje strukturę strumienia - nie wpływa na timing emisji, nie dodaje ani nie usuwa wartości, jedynie je przekształca. Jest całkowicie synchroniczny i deterministyczny, co czyni go bezpiecznym i przewidywalnym w użyciu.
Przykład kodu:
import { of, fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';
// Przykład 1: Transformacja prostych wartości
const numbers$ = of(1, 2, 3, 4, 5);
const doubled$ = numbers$.pipe(
map(x => x * 2)
);
doubled$.subscribe(val => console.log(val)); // 2, 4, 6, 8, 10
// Przykład 2: Wyciąganie pól z obiektów
interface User {
id: number;
name: string;
email: string;
}
const users$ = of<User>(
{ id: 1, name: 'Anna', email: 'anna@example.com' },
{ id: 2, name: 'Jan', email: 'jan@example.com' }
);
const userNames$ = users$.pipe(
map(user => user.name)
);
userNames$.subscribe(name => console.log(name)); // 'Anna', 'Jan'
// Przykład 3: Transformacja zdarzeń DOM
const clicks$ = fromEvent<MouseEvent>(document, 'click');
const clickPositions$ = clicks$.pipe(
map(event => ({ x: event.clientX, y: event.clientY }))
);
clickPositions$.subscribe(pos => console.log(`Kliknięto w: ${pos.x}, ${pos.y}`));
// Przykład 4: Mapowanie odpowiedzi HTTP
interface ApiResponse {
data: { users: User[] };
status: number;
}
const apiResponse$ = of<ApiResponse>({
data: { users: [{ id: 1, name: 'Anna', email: 'anna@example.com' }] },
status: 200
});
const extractedUsers$ = apiResponse$.pipe(
map(response => response.data.users)
);
Materiały
↑ Powrót na góręOperatory Filtrujące w RxJS
21. Jaka jest różnica między debounceTime() a throttleTime()?
Odpowiedź w 30 sekund:
debounceTime() czeka aż przez określony czas nie pojawi się nowa wartość, a następnie emituje ostatnią wartość. throttleTime() emituje pierwszą wartość, ignoruje kolejne przez określony czas, potem cykl się powtarza. Debounce "spokój po burzy", throttle "ograniczenie częstotliwości".
Odpowiedź w 2 minuty:
debounceTime() i throttleTime() to dwa kluczowe operatory do kontrolowania częstotliwości emisji wartości, ale działają na odmiennych zasadach i są używane w różnych scenariuszach.
debounceTime(dueTime) czeka określony czas (dueTime w milisekundach) od ostatniej emitowanej wartości. Jeśli w tym czasie pojawi się nowa wartość, timer jest resetowany. Dopiero gdy przez pełny okres dueTime nie pojawi się żadna nowa wartość, operator emituje ostatnią otrzymaną wartość. To sprawia, że idealnie nadaje się do obsługi pól wyszukiwania - chcemy poczekać aż użytkownik skończy pisać, zanim wyślemy zapytanie do API. Każde naciśnięcie klawisza resetuje timer, więc zapytanie zostanie wysłane dopiero gdy użytkownik przestanie pisać.
throttleTime(duration, config?) działa na zasadzie "przepustnicy" - gdy pojawi się wartość, jest ona natychmiast emitowana, a następnie przez określony czas (duration) wszystkie kolejne wartości są ignorowane. Po upływie tego czasu operator znów może przepuścić wartość. Domyślnie emitowana jest pierwsza wartość w oknie czasowym (leading edge), ale można to zmienić konfiguracją. Jest idealny do ograniczania częstotliwości zdarzeń ciągłych jak scroll czy resize - chcemy reagować regularnie, ale nie na każde pojedyncze zdarzenie.
Kluczowa różnica: debounceTime czeka na "ciszę" w strumieniu zdarzeń, podczas gdy throttleTime gwarantuje maksymalną częstotliwość emisji. W przypadku ciągłego strumienia zdarzeń (np. scroll), debounceTime może w ogóle nic nie wyemitować do momentu zatrzymania, podczas gdy throttleTime będzie emitować wartości w regularnych odstępach czasu.
Przykład kodu:
import { fromEvent } from 'rxjs';
import { debounceTime, throttleTime, map } from 'rxjs';
// 1. debounceTime - pole wyszukiwania
const searchInput = document.querySelector('#search') as HTMLInputElement;
const search$ = fromEvent(searchInput, 'input');
search$.pipe(
map(event => (event.target as HTMLInputElement).value),
debounceTime(500) // Czekaj 500ms od ostatniego naciśnięcia klawisza
).subscribe(searchTerm => {
console.log('Wysyłam zapytanie dla:', searchTerm);
// API call tutaj - tylko gdy użytkownik przestanie pisać
});
// 2. throttleTime - zdarzenie scroll
const scroll$ = fromEvent(window, 'scroll');
scroll$.pipe(
throttleTime(1000) // Maksymalnie jedna emisja na sekundę
).subscribe(() => {
console.log('Sprawdzam pozycję scrolla');
// Obliczenia związane ze scrollem
});
// 3. Porównanie bezpośrednie
const clicks$ = fromEvent(document, 'click');
// Debounce - emituje po 1s od ostatniego kliknięcia
clicks$.pipe(
debounceTime(1000)
).subscribe(() => {
console.log('DEBOUNCE: Minęła sekunda od ostatniego kliknięcia');
});
// Throttle - emituje pierwsze kliknięcie, potem ignoruje przez 1s
clicks$.pipe(
throttleTime(1000)
).subscribe(() => {
console.log('THROTTLE: Kliknięcie (następne zignorowane przez 1s)');
});
// 4. throttleTime z konfiguracją
import { asyncScheduler } from 'rxjs';
clicks$.pipe(
throttleTime(1000, asyncScheduler, {
leading: true, // Emituj pierwszą wartość (domyślnie true)
trailing: true // Emituj ostatnią wartość w oknie (domyślnie false)
})
).subscribe(() => {
console.log('THROTTLE z trailing: kliknięcie');
});
// 5. Praktyczny przykład - formularz autosave
import { distinctUntilChanged } from 'rxjs';
const formInput$ = fromEvent(
document.querySelector('#form') as HTMLFormElement,
'input'
);
formInput$.pipe(
map(() => {
const form = document.querySelector('#form') as HTMLFormElement;
return new FormData(form);
}),
debounceTime(2000), // Zapisz 2s po ostatniej zmianie
distinctUntilChanged() // Tylko jeśli wartość się zmieniła
).subscribe(formData => {
console.log('Autozapis formularza');
// Zapisz formularz do localStorage lub API
});
// 6. Resize window z throttle
const resize$ = fromEvent(window, 'resize');
resize$.pipe(
throttleTime(500), // Maksymalnie co 500ms
map(() => ({
width: window.innerWidth,
height: window.innerHeight
}))
).subscribe(dimensions => {
console.log('Nowy rozmiar okna:', dimensions);
// Przelicz layout
});
// 7. Różnica w zachowaniu przy ciągłym strumieniu
import { interval } from 'rxjs';
const rapidStream$ = interval(100); // Co 100ms
// Debounce - nic nie emituje, bo strumień nigdy się nie kończy!
rapidStream$.pipe(
debounceTime(500)
).subscribe(num => {
console.log('Debounce:', num); // NIE WYEMITUJE NIC
});
// Throttle - emituje co 500ms
rapidStream$.pipe(
throttleTime(500)
).subscribe(num => {
console.log('Throttle:', num); // Co ~500ms: 4, 9, 14, 19...
});
Diagram Mermaid - Różnica między debounceTime a throttleTime:
gantt
title Porównanie debounceTime vs throttleTime
dateFormat X
axisFormat %L
section Zdarzenia źródłowe
Klik 1 :milestone, 0, 0
Klik 2 :milestone, 200, 200
Klik 3 :milestone, 400, 400
Klik 4 :milestone, 1500, 1500
Klik 5 :milestone, 1700, 1700
section debounceTime(500)
Czekanie :active, 0, 900
Emit (Klik 3) :crit, milestone, 900, 900
Czekanie :active, 1500, 2200
Emit (Klik 5) :crit, milestone, 2200, 2200
section throttleTime(500)
Emit (Klik 1) :crit, milestone, 0, 0
Blokada :active, 0, 500
Emit (Klik 3) :crit, milestone, 500, 500
Blokada :active, 500, 1000
Emit (Klik 4) :crit, milestone, 1500, 1500
Blokada :active, 1500, 2000
Materiały:
- RxJS debounceTime() - Dokumentacja
- RxJS throttleTime() - Dokumentacja
- Learn RxJS - debounceTime vs throttleTime
- Debounce vs Throttle - CSS Tricks
Subjects i Multicasting - RxJS
Czym jest Subject i czym różni się od zwykłego Observable?
Odpowiedź w 30 sekund: Subject to specjalny typ Observable, który działa zarówno jako Observable (można go subskrybować), jak i Observer (można do niego wysyłać wartości). W przeciwieństwie do zwykłego Observable, Subject jest multicast - wszystkie subskrypcje dzielą się tym samym wykonaniem i otrzymują te same wartości.
Odpowiedź w 2 minuty:
Subject w RxJS to hybrydowa struktura łącząca cechy Observable i Observer. Jako Observable można go subskrybować, ale jako Observer można do niego aktywnie wysyłać wartości metodą next(). To czyni go idealnym mostem między imperatywnym a reaktywnym stylem programowania.
Kluczowa różnica między Subject a zwykłym Observable polega na charakterze emisji. Zwykły Observable jest unicast - każda subskrypcja tworzy niezależne wykonanie producenta wartości. Subject natomiast jest multicast - wszystkie subskrypcje słuchają tego samego źródła i otrzymują identyczne wartości w tym samym czasie.
Subject nie przechowuje wartości - nowi subskrybenci nie otrzymują wartości emitowanych przed ich subskrypcją. Jest jak radio nadające na żywo - słuchasz tylko tego, co jest transmitowane w momencie, gdy włączysz odbiornik.
Subjects są często używane do zarządzania stanem w aplikacjach, przekształcania callbacków w Observable, lub jako EventBus w architekturze aplikacji. Pozwalają na programowanie reaktywne tam, gdzie bezpośredni dostęp do Observable jest niemożliwy lub niewygodny.
Przykład kodu:
import { Observable, Subject } from 'rxjs';
// Zwykły Observable - unicast
const observable = new Observable(subscriber => {
console.log('Nowe wykonanie Observable');
subscriber.next(Math.random());
});
observable.subscribe(val => console.log('Sub A:', val));
observable.subscribe(val => console.log('Sub B:', val));
// Wynik:
// Nowe wykonanie Observable
// Sub A: 0.123
// Nowe wykonanie Observable
// Sub B: 0.456
// Każda subskrypcja otrzymuje inną wartość!
// Subject - multicast
const subject = new Subject<number>();
subject.subscribe(val => console.log('Sub A:', val));
subject.subscribe(val => console.log('Sub B:', val));
console.log('Emituję wartość');
subject.next(Math.random());
// Wynik:
// Emituję wartość
// Sub A: 0.789
// Sub B: 0.789
// Obie subskrypcje otrzymują tę samą wartość!
// Subject jako Observer
const dataObservable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
});
const dataSubject = new Subject<number>();
dataSubject.subscribe(val => console.log('Otrzymano:', val));
// Subject subskrybuje Observable
dataObservable.subscribe(dataSubject);
// Wynik: Otrzymano: 1, 2, 3
Materiały
- RxJS Subject - Dokumentacja oficjalna
- Understanding RxJS Subjects
- The magic of RxJS sharing operators
Operatory Tworzące w RxJS
6. Jak używać operatora of() do tworzenia Observable z wartości?
Odpowiedź w 30 sekund:
Operator of() tworzy Observable, który emituje podane wartości po kolei, a następnie od razu się kończy (complete). Jest idealny do tworzenia prostych strumieni z wcześniej znanych wartości lub do testowania.
Odpowiedź w 2 minuty:
Operator of() to jeden z najprostszych operatorów tworzących w RxJS. Przyjmuje dowolną liczbę argumentów i tworzy Observable, który emituje każdą z tych wartości synchronicznie, jedna po drugiej, a następnie wywołuje complete(). Jest to bardzo przydatne gdy chcemy opakować znane wartości w Observable, np. do testowania, zwracania stałych wartości z funkcji, lub łączenia z innymi operatorami.
W praktyce of() często używamy gdy musimy zwrócić Observable z funkcji, ale wartość jest już znana i nie wymaga asynchronicznego pobierania. Możemy przekazać wartości różnych typów - liczby, stringi, obiekty, tablice, funkcje. Każdy argument będzie osobną emisją. Jeśli chcemy wyemitować tablicę jako pojedynczą wartość, przekazujemy ją jako jeden argument.
Różnica między of([1, 2, 3]) a of(1, 2, 3) jest kluczowa: pierwsza wersja wyemituje jedną wartość (tablicę), druga wyemituje trzy oddzielne wartości. Operator of() jest synchroniczny - wszystkie emisje następują natychmiast, w tym samym cyklu zdarzeń.
Częste zastosowania to mockowanie danych w testach, tworzenie prostych strumieni do demonstracji działania operatorów, oraz sytuacje gdy chcemy ujednolicić API funkcji zwracających zarówno synchroniczne jak i asynchroniczne wartości.
Przykład kodu:
import { of } from 'rxjs';
// Podstawowe użycie - emituje 3 wartości i kończy
of(1, 2, 3).subscribe({
next: (value) => console.log('Wartość:', value),
complete: () => console.log('Zakończone!')
});
// Output: Wartość: 1, Wartość: 2, Wartość: 3, Zakończone!
// Emitowanie różnych typów danych
of('tekst', 42, { name: 'Jan' }, [1, 2, 3]).subscribe(
value => console.log(typeof value, value)
);
// Emitowanie tablicy jako pojedynczej wartości
of([1, 2, 3]).subscribe(arr => {
console.log('Tablica:', arr); // Output: Tablica: [1, 2, 3]
});
// Praktyczne użycie - mockowanie API
function getUserById(id: number): Observable<User> {
if (id === 1) {
// Zwracamy mockowe dane
return of({ id: 1, name: 'Jan Kowalski' });
}
// W rzeczywistości tutaj byłoby HTTP request
return ajax.getJSON(`/api/users/${id}`);
}
// Użycie w pipe do transformacji
of(5, 10, 15)
.pipe(
map(x => x * 2)
)
.subscribe(x => console.log(x)); // Output: 10, 20, 30
Materiały
↑ Powrót na górę7. Jak działa operator from() i jakie typy danych może konwertować?
Odpowiedź w 30 sekund:
Operator from() konwertuje różne typy danych do Observable: tablice, Promise, iteratory, Observable-like obiekty. W przeciwieństwie do of(), który traktuje tablicę jako jedną wartość, from() emituje każdy element tablicy osobno.
Odpowiedź w 2 minuty:
Operator from() jest bardzo uniwersalnym operatorem tworzącym, który potrafi przekonwertować wiele różnych typów danych na Observable. Najczęściej używamy go do konwersji tablic (emitując każdy element osobno), Promise (konwertując do Observable, który emituje wynik Promise), oraz obiektów iterowalnych jak Map, Set, String czy generatory.
Gdy przekażemy tablicę do from([1, 2, 3]), otrzymamy Observable emitujący trzy osobne wartości: 1, 2, 3, a następnie complete. To fundamentalna różnica w porównaniu do of([1, 2, 3]), który wyemitowałby całą tablicę jako jedną wartość. Dzięki temu from() świetnie nadaje się do przetwarzania kolekcji element po elemencie.
Promise są kolejnym częstym przypadkiem użycia. from(fetch('/api/data')) przekonwertuje Promise zwracane przez fetch na Observable. Gdy Promise się rozwiąże, Observable wyemituje wartość i zakończy się. Jeśli Promise zostanie odrzucone, Observable wyemituje błąd. To pozwala na łączenie kodu opartego na Promise z operatorami RxJS.
Operator from() obsługuje również obiekty Observable-like (z metodą subscribe), iteratory (obiekty z metodą Symbol.iterator), oraz inne struktury danych. Może obsłużyć string (emitując każdy znak osobno) oraz async iteratory. Ta uniwersalność czyni from() kluczowym narzędziem do integracji RxJS z różnymi źródłami danych w aplikacji.
Przykład kodu:
import { from } from 'rxjs';
import { map, take } from 'rxjs/operators';
// Konwersja tablicy - każdy element osobno
from([10, 20, 30, 40]).subscribe(
value => console.log('Element:', value)
);
// Output: Element: 10, Element: 20, Element: 30, Element: 40
// Konwersja Promise
const promise = new Promise(resolve => {
setTimeout(() => resolve('Dane z Promise'), 1000);
});
from(promise).subscribe(
value => console.log('Promise zwrócił:', value)
);
// Output (po 1s): Promise zwrócił: Dane z Promise
// Konwersja Map
const mapa = new Map([
['klucz1', 'wartość1'],
['klucz2', 'wartość2']
]);
from(mapa).subscribe(
([key, value]) => console.log(`${key}: ${value}`)
);
// Output: klucz1: wartość1, klucz2: wartość2
// Konwersja Set
from(new Set([1, 2, 2, 3, 3, 3])).subscribe(
value => console.log('Unikalna wartość:', value)
);
// Output: Unikalna wartość: 1, 2, 3
// Konwersja stringa
from('RxJS').subscribe(
char => console.log('Znak:', char)
);
// Output: Znak: R, Znak: x, Znak: J, Znak: S
// Generator function
function* numberGenerator() {
yield 1;
yield 2;
yield 3;
}
from(numberGenerator()).subscribe(
num => console.log('Z generatora:', num)
);
// Praktyczne użycie - fetch API z RxJS
from(fetch('https://api.example.com/data'))
.pipe(
switchMap(response => from(response.json())),
map(data => data.items)
)
.subscribe(items => console.log('Pobrane dane:', items));
// Różnica między from() i of()
console.log('--- Używając from() ---');
from([1, 2, 3]).subscribe(x => console.log(x)); // 1, 2, 3
console.log('--- Używając of() ---');
of([1, 2, 3]).subscribe(x => console.log(x)); // [1, 2, 3]
Materiały
↑ Powrót na górę8. Czym jest operator fromEvent() i jak obsługiwać zdarzenia DOM?
Odpowiedź w 30 sekund:
Operator fromEvent() konwertuje zdarzenia DOM (i inne event emitters) na Observable. Automatycznie dodaje event listener, emituje każde zdarzenie, i usuwa listener przy unsubscribe. Idealny do obsługi kliknięć, ruchów myszy, czy inputów użytkownika.
Odpowiedź w 2 minuty:
fromEvent() to kluczowy operator do pracy ze zdarzeniami w przeglądarce. Pozwala on przekształcić dowolne zdarzenie DOM w strumień Observable, co umożliwia zastosowanie całego zestawu operatorów RxJS do obsługi interakcji użytkownika. Operator przyjmuje dwa główne parametry: target (element DOM, window, document, lub dowolny EventTarget) oraz nazwę zdarzenia (np. 'click', 'mousemove', 'keyup').
Wielką zaletą fromEvent() jest automatyczne zarządzanie cyklem życia listenera. Kiedy tworzysz Observable za pomocą fromEvent(), listener jest dodawany dopiero gdy ktoś zasubskrybuje ten Observable. Co ważniejsze, gdy wywołasz unsubscribe(), listener zostanie automatycznie usunięty, co zapobiega wyciekom pamięci - częstemu problemowi przy ręcznym zarządzaniu event listenerami.
Observable stworzony przez fromEvent() nigdy się nie kończy sam - będzie emitował zdarzenia dopóki nie wywołasz unsubscribe lub dopóki element nie zostanie usunięty z DOM. Każda emisja to obiekt Event z przeglądarki, zawierający wszystkie informacje o zdarzeniu (target, timestamp, dane specyficzne dla typu zdarzenia).
W praktyce fromEvent() świetnie współgra z operatorami RxJS. Możesz użyć debounceTime() do ograniczenia częstotliwości zdarzeń (np. przy input), throttleTime() do kontrolowania rate'u (np. przy scroll), map() do wyciągnięcia konkretnych danych ze zdarzenia, czy filter() do warunkowej obsługi. To czyni kod obsługi zdarzeń znacznie bardziej czytelnym i deklaratywnym niż tradycyjne podejście z callback'ami.
Przykład kodu:
import { fromEvent } from 'rxjs';
import { map, debounceTime, throttleTime, filter, tap } from 'rxjs/operators';
// Podstawowa obsługa kliknięcia
const button = document.querySelector('#myButton');
const clicks$ = fromEvent(button, 'click');
clicks$.subscribe(event => {
console.log('Przycisk kliknięty!', event);
});
// Obsługa inputu z debounce (czeka aż użytkownik przestanie pisać)
const searchInput = document.querySelector('#searchInput');
const search$ = fromEvent(searchInput, 'input').pipe(
debounceTime(500), // Czeka 500ms po ostatnim znaku
map((event: Event) => (event.target as HTMLInputElement).value),
filter(text => text.length >= 3), // Minimum 3 znaki
tap(text => console.log('Szukam:', text))
);
search$.subscribe(searchTerm => {
// Wykonaj wyszukiwanie
performSearch(searchTerm);
});
// Obsługa ruchu myszy z throttle (limituje częstotliwość)
const mousemove$ = fromEvent(document, 'mousemove').pipe(
throttleTime(100), // Maksymalnie co 100ms
map((event: MouseEvent) => ({ x: event.clientX, y: event.clientY }))
);
mousemove$.subscribe(position => {
console.log(`Pozycja myszy: X=${position.x}, Y=${position.y}`);
});
// Obsługa scroll z pozycją
const scroll$ = fromEvent(window, 'scroll').pipe(
throttleTime(200),
map(() => window.scrollY)
);
scroll$.subscribe(scrollPosition => {
if (scrollPosition > 300) {
// Pokaż przycisk "scroll to top"
showScrollTopButton();
}
});
// Obsługa klawiszy - tylko Enter
const input = document.querySelector('#messageInput');
const enterKey$ = fromEvent(input, 'keyup').pipe(
filter((event: KeyboardEvent) => event.key === 'Enter'),
map((event: Event) => (event.target as HTMLInputElement).value)
);
enterKey$.subscribe(message => {
console.log('Wysłano wiadomość:', message);
sendMessage(message);
});
// Łączenie wielu zdarzeń - drag and drop
const element = document.querySelector('#draggable');
const mousedown$ = fromEvent(element, 'mousedown');
const mousemove$ = fromEvent(document, 'mousemove');
const mouseup$ = fromEvent(document, 'mouseup');
const drag$ = mousedown$.pipe(
switchMap(() => mousemove$.pipe(
takeUntil(mouseup$),
map((event: MouseEvent) => ({
x: event.clientX,
y: event.clientY
}))
))
);
drag$.subscribe(position => {
// Przesuń element
element.style.left = position.x + 'px';
element.style.top = position.y + 'px';
});
// Unsubscribe - ważne przy niszczeniu komponentów
const subscription = clicks$.subscribe(/* ... */);
// Później, np. w ngOnDestroy() w Angular
subscription.unsubscribe(); // Usuwa event listener
Diagram przepływu zdarzeń:
graph LR
A[DOM Event] --> B[fromEvent]
B --> C[Observable Stream]
C --> D[debounceTime/throttleTime]
D --> E[map/filter]
E --> F[subscribe]
F --> G[Handler Function]
style A fill:#e1f5ff
style C fill:#fff4e1
style F fill:#e7ffe1
Materiały
↑ Powrót na górę9. Jak używać operatora interval() i timer() do tworzenia strumieni czasowych?
Odpowiedź w 30 sekund:
interval(n) emituje liczby (0, 1, 2...) co n milisekund w nieskończoność. timer(delay, period) czeka delay milisekund, emituje 0, a potem opcjonalnie emituje kolejne wartości co period milisekund. Oba przydatne do polling, animacji i operacji czasowych.
Odpowiedź w 2 minuty:
Operatory interval() i timer() służą do tworzenia Observable bazujących na czasie, co jest niezwykle przydatne w wielu scenariuszach aplikacji. interval(n) to prostszy z nich - tworzy nieskończony strumień emitujący kolejne liczby całkowite (zaczynając od 0) w regularnych odstępach czasu. Na przykład interval(1000) będzie emitował 0, 1, 2, 3... co sekundę, aż do momentu unsubscribe.
timer() jest bardziej elastyczny i ma dwa tryby działania. Jako timer(delay) działa jak setTimeout - czeka określony czas i emituje pojedynczą wartość 0, po czym się kończy. Jako timer(delay, period) działa jak kombinacja setTimeout i setInterval - czeka delay milisekund, emituje 0, a potem emituje kolejne wartości (1, 2, 3...) co period milisekund. Możesz też użyć timer(0, 1000), co zadziała podobnie do interval(1000), ale pierwsza emisja nastąpi natychmiast.
Oba operatory świetnie współpracują z innymi operatorami RxJS. Możesz użyć take(n) aby ograniczyć liczbę emisji, takeUntil() aby zakończyć strumień na podstawie warunku, czy switchMap() aby uruchomić inną operację przy każdym ticku. Częste zastosowania to polling API (sprawdzanie nowych danych co X sekund), animacje (aktualizacja interfejsu co frame), liczniki czasu, timeout'y, oraz operacje wymagające opóźnienia.
Ważne jest aby pamiętać o unsubscribe przy tych operatorach, ponieważ będą działać w nieskończoność i mogą powodować wycieki pamięci. W frameworkach jak Angular, najlepiej używać ich z operatorem takeUntil() podpiętym do lifecycle hook'a komponentu (np. ngOnDestroy), lub korzystać z async pipe, który automatycznie zarządza subskrypcją.
Przykład kodu:
import { interval, timer } from 'rxjs';
import { take, takeUntil, map, switchMap, tap } from 'rxjs/operators';
// INTERVAL - podstawowe użycie
console.log('Start interval');
const interval$ = interval(1000); // Co 1 sekundę
interval$.pipe(take(5)).subscribe(
value => console.log('Interval emitował:', value)
);
// Output: 0, 1, 2, 3, 4 (co sekundę)
// TIMER - pojedyncza emisja (jak setTimeout)
console.log('Start timer - pojedyncza emisja');
timer(3000).subscribe(
() => console.log('Timer zakończony po 3 sekundach!')
);
// TIMER - z okresowymi emisjami (jak setInterval)
console.log('Start timer - okresowe emisje');
timer(2000, 1000).pipe(take(5)).subscribe(
value => console.log('Timer emitował:', value)
);
// Czeka 2s, potem emituje: 0, 1, 2, 3, 4 (co 1s)
// TIMER vs INTERVAL - różnica w pierwszej emisji
timer(0, 1000).pipe(take(3)).subscribe(
v => console.log('Timer z delay 0:', v)
);
// Output natychmiast: 0, potem 1, 2 (co 1s)
interval(1000).pipe(take(3)).subscribe(
v => console.log('Interval:', v)
);
// Output po 1s: 0, potem 1, 2 (co 1s)
// Praktyczne użycie 1: Polling API
const pollApi$ = interval(5000).pipe(
switchMap(() => fetch('/api/status').then(r => r.json())),
tap(data => console.log('Pobrano nowe dane:', data))
);
const pollSubscription = pollApi$.subscribe();
// Zatrzymaj polling po 30 sekundach
timer(30000).subscribe(() => {
pollSubscription.unsubscribe();
console.log('Polling zatrzymany');
});
// Praktyczne użycie 2: Licznik odliczający
const countdown$ = timer(0, 1000).pipe(
map(n => 10 - n),
take(11)
);
countdown$.subscribe(
seconds => console.log(`Pozostało: ${seconds}s`),
null,
() => console.log('Odliczanie zakończone!')
);
// Praktyczne użycie 3: Automatyczne odświeżanie tokenu
const tokenRefresh$ = timer(3600000, 3600000).pipe(
switchMap(() => refreshAuthToken())
);
tokenRefresh$.subscribe(
token => console.log('Token odświeżony:', token)
);
// Praktyczne użycie 4: Animacja z interwałem
const animation$ = interval(16).pipe( // ~60 FPS
take(100),
map(frame => frame / 100) // 0.0 do 1.0
);
animation$.subscribe(progress => {
const element = document.querySelector('#animated');
element.style.opacity = progress.toString();
});
// Praktyczne użycie 5: Retry z opóźnieniem
function fetchWithRetry(url: string) {
return fetch(url).pipe(
catchError(error => {
console.log('Błąd, retry za 3s...');
return timer(3000).pipe(
switchMap(() => fetch(url))
);
})
);
}
// Użycie z takeUntil (Angular pattern)
import { Subject } from 'rxjs';
class MyComponent {
private destroy$ = new Subject<void>();
ngOnInit() {
// Polling zatrzyma się automatycznie przy destroy
interval(1000).pipe(
takeUntil(this.destroy$),
tap(n => console.log('Tick:', n))
).subscribe();
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Diagram czasowy:
gantt
title Porównanie interval() i timer()
dateFormat X
axisFormat %L ms
section interval(1000)
0: milestone, 1000, 0
1: milestone, 2000, 0
2: milestone, 3000, 0
3: milestone, 4000, 0
section timer(2000, 1000)
wait: 0, 2000
0: milestone, 2000, 0
1: milestone, 3000, 0
2: milestone, 4000, 0
section timer(3000)
wait: 0, 3000
0: milestone, 3000, 0
Materiały
- RxJS interval() - Dokumentacja oficjalna
- RxJS timer() - Dokumentacja oficjalna
- Learn RxJS - interval operator
- Learn RxJS - timer operator
Operatory Łączące w RxJS
Jaka jest różnica między merge() a concat()?
Odpowiedź w 30 sekund:
merge() subskrybuje wszystkie Observable jednocześnie i emituje wartości w miarę ich pojawiania się (równolegle), podczas gdy concat() subskrybuje je sekwencyjnie - czeka na zakończenie poprzedniego przed rozpoczęciem następnego. Kolejność emisji w merge() zależy od tego, który Observable emituje szybciej, a w concat() zachowuje kolejność źródeł.
Odpowiedź w 2 minuty:
merge() to operator, który tworzy wyjściowy Observable łączący wiele Observable poprzez równoczesną subskrypcję wszystkich z nich. Wartości są emitowane natychmiast, jak tylko którykolwiek ze źródłowych Observable je wyprodukuje. Jest to idealne rozwiązanie, gdy chcemy obsługiwać wiele niezależnych strumieni jednocześnie i zależy nam na szybkości reakcji.
concat() działa w sposób sekwencyjny - subskrybuje pierwszy Observable i czeka, aż się zakończy (complete), dopiero potem przechodzi do następnego. Gwarantuje to zachowanie kolejności emisji zgodnie z kolejnością przekazanych Observable. Jest to przydatne, gdy kolejność operacji ma znaczenie lub gdy chcemy uniknąć przeciążenia systemu równoczesnymi żądaniami.
Kluczowa różnica dotyczy również obsługi błędów: w merge() błąd w jednym Observable nie zatrzymuje pozostałych (chyba że użyjemy odpowiednich operatorów), podczas gdy w concat() błąd kończy całą sekwencję.
Wydajnościowo merge() jest szybszy, gdy strumienie są niezależne, ale concat() daje większą kontrolę i przewidywalność, szczególnie przy operacjach zależnych od siebie.
Przykład kodu:
import { merge, concat, interval, of } from 'rxjs';
import { take, map } from 'rxjs/operators';
// Źródłowe Observable
const fast$ = interval(500).pipe(
take(3),
map(x => `Szybki: ${x}`)
);
const slow$ = interval(1000).pipe(
take(3),
map(x => `Wolny: ${x}`)
);
// merge() - równoległa emisja
console.log('=== MERGE (równolegle) ===');
merge(fast$, slow$).subscribe(console.log);
// Wynik: Szybki: 0, Szybki: 1, Wolny: 0, Szybki: 2, Wolny: 1, Wolny: 2
// Wartości są przeplecione w zależności od czasu emisji
// concat() - sekwencyjna emisja
console.log('\n=== CONCAT (sekwencyjnie) ===');
concat(fast$, slow$).subscribe(console.log);
// Wynik: Szybki: 0, Szybki: 1, Szybki: 2, Wolny: 0, Wolny: 1, Wolny: 2
// Najpierw wszystkie wartości z fast$, potem z slow$
// Praktyczny przykład - żądania HTTP
const saveUser$ = of('Użytkownik zapisany').pipe(delay(1000));
const sendEmail$ = of('Email wysłany').pipe(delay(500));
const updateCache$ = of('Cache zaktualizowany').pipe(delay(300));
// concat() - dla operacji zależnych (zapisz -> wyślij -> zaktualizuj)
concat(saveUser$, sendEmail$, updateCache$).subscribe(
result => console.log(result)
);
// merge() - dla operacji niezależnych (wszystko jednocześnie)
merge(saveUser$, sendEmail$, updateCache$).subscribe(
result => console.log(result)
);
Materiały
- RxJS merge() - Oficjalna dokumentacja
- RxJS concat() - Oficjalna dokumentacja
- Learn RxJS - merge
- Learn RxJS - concat
Obsługa Błędów i Retry w RxJS
Jak obsługiwać błędy w strumieniach RxJS za pomocą catchError()?
Odpowiedź w 30 sekund:
Operator catchError() przechwytuje błędy w strumieniu Observable i pozwala na ich obsługę poprzez zwrócenie nowego Observable, wartości zastępczej lub przekazanie błędu dalej. Zapobiega on niespodziewanemu zakończeniu strumienia i umożliwia kontynuację działania aplikacji.
Odpowiedź w 2 minuty:
Operator catchError() jest kluczowym narzędziem do obsługi błędów w RxJS. Działa jako middleware, który przechwytuje błędy zanim dotrą one do subskrybenta. Przyjmuje funkcję callback z dwoma parametrami: błędem oraz oryginalnym Observable, co pozwala na różne strategie obsługi błędów.
Istnieje kilka typowych wzorców użycia catchError(). Po pierwsze, można zwrócić wartość zastępczą za pomocą of(), co pozwala na graceful degradation - aplikacja kontynuuje działanie z domyślną wartością. Po drugie, można zwrócić pusty Observable używając EMPTY, co skutkuje cichym zakończeniem strumienia bez emitowania wartości. Po trzecie, można ponownie wyrzucić błąd używając throwError(), co pozwala na transformację błędu przed przekazaniem go dalej.
Ważną cechą catchError() jest to, że po przechwyceniu błędu oryginalny strumień zostaje zakończony. Jeśli chcemy kontynuować subskrypcję po błędzie, musimy użyć catchError() wewnątrz operatorów jak mergeMap() lub wykorzystać wzorzec retry. Operator można również łączyć w łańcuch - wiele wywołań catchError() na różnych poziomach pozwala na wielopoziomową obsługę błędów.
Dobrą praktyką jest umieszczanie catchError() jak najbliżej źródła błędu, co pozwala na precyzyjną obsługę różnych typów błędów w różnych częściach pipeline'u. Możemy też używać catchError() na poziomie globalnym (na końcu pipeline'u) jako ostatniej linii obrony przed nieobsłużonymi błędami.
Przykład kodu:
import { of, throwError } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, map, mergeMap } from 'rxjs/operators';
// Przykład 1: Zwracanie wartości zastępczej
const bezpiecznePobranieUzytkownika$ = ajax.getJSON('https://api.example.com/user/123').pipe(
catchError(error => {
console.error('Błąd pobierania użytkownika:', error);
// Zwracamy domyślnego użytkownika
return of({ id: 0, name: 'Gość', email: '' });
})
);
// Przykład 2: Różne strategie obsługi błędów
const zaawansowanaObslugaBledow$ = ajax.getJSON('https://api.example.com/data').pipe(
map(response => response.data),
catchError(error => {
// Obsługa różnych typów błędów
if (error.status === 404) {
console.warn('Zasób nie znaleziony');
return of([]); // Zwracamy pustą tablicę
} else if (error.status === 401) {
console.error('Brak autoryzacji');
// Przekierowujemy do logowania
window.location.href = '/login';
return EMPTY; // Kończymy strumień bez emitowania wartości
} else {
// Inne błędy - transformujemy i rzucamy dalej
return throwError(() => new Error(`Błąd API: ${error.message}`));
}
})
);
// Przykład 3: Wielopoziomowa obsługa błędów
const wielopoziomowa$ = ajax.getJSON('https://api.example.com/users').pipe(
mergeMap(users =>
// Dla każdego użytkownika pobieramy szczegóły
ajax.getJSON(`https://api.example.com/users/${users[0].id}/details`).pipe(
// Obsługa błędu na poziomie pojedynczego żądania
catchError(error => {
console.error('Błąd pobierania szczegółów:', error);
return of({ details: 'niedostępne' });
})
)
),
// Obsługa błędu na poziomie całego strumienia
catchError(error => {
console.error('Błąd główny:', error);
return of({ users: [], error: true });
})
);
// Przykład 4: Użycie drugiego parametru (caught$)
const ponownaProba$ = ajax.getJSON('https://api.example.com/data').pipe(
catchError((error, caught$) => {
console.error('Błąd, ponawiam próbę...', error);
// Zwracamy oryginalny Observable, aby spróbować ponownie
// UWAGA: To może prowadzić do nieskończonej pętli!
// Lepiej użyć operatora retry()
return caught$;
})
);
// Przykład 5: CatchError z warunkiem
let licznikProb = 0;
const warunkowaObsluga$ = ajax.getJSON('https://api.example.com/unstable').pipe(
catchError((error, caught$) => {
licznikProb++;
if (licznikProb < 3) {
console.log(`Próba ${licznikProb} nieudana, ponawiam...`);
return caught$; // Próbujemy ponownie
} else {
console.error('Przekroczono limit prób');
return of({ error: 'Serwis tymczasowo niedostępny' });
}
})
);
Materiały
- RxJS catchError - Oficjalna dokumentacja
- Error Handling w RxJS - Learn RxJS
- RxJS Error Handling Guide
Najlepsze Praktyki i Wzorce RxJS
Jak unikać memory leaks przy pracy z RxJS w Angular?
Odpowiedź w 30 sekund:
Memory leaks w RxJS powstają gdy subskrypcje nie są anulowane po zniszczeniu komponentu. Należy zawsze czyścić subskrypcje używając operatora takeUntil z Subject, async pipe w szablonach, lub Angular 16+ DestroyRef. To zapewnia, że strumienie są prawidłowo zamykane i pamięć jest zwalniana.
Odpowiedź w 2 minuty:
Memory leaks w aplikacjach Angular z RxJS są jednym z najczęstszych problemów wydajnościowych. Występują gdy komponent zostaje zniszczony, ale jego subskrypcje nadal działają w tle, konsumując pamięć i wykonując niepotrzebne operacje. Główną przyczyną jest brak wywołania metody unsubscribe() na aktywnych subskrypcjach.
Istnieją trzy podstawowe wzorce zapobiegania memory leaks. Pierwszy to pattern takeUntil, gdzie używamy Subject jako sygnału zniszczenia komponentu i automatycznie kończymy wszystkie subskrypcje w ngOnDestroy. Drugi to wykorzystanie async pipe w szablonach, która automatycznie zarządza cyklem życia subskrypcji. Trzeci to nowy DestroyRef API dostępny od Angular 16+, który oferuje reaktywny sposób reagowania na zniszczenie komponentu.
Warto również rozróżnić które subskrypcje wymagają czyszczenia - Observable zwracane przez HttpClient automatycznie się kończą po otrzymaniu odpowiedzi, ale strumienie nieskończone jak interval(), fromEvent() czy BehaviorSubject wymagają ręcznego zarządzania. W przypadku Route params lub ActivatedRoute, Angular zarządza nimi automatycznie tylko dla tego samego komponentu, ale przy nawigacji do różnych komponentów warto używać takeUntil dla pewności.
Najlepszą praktyką jest konsekwentne stosowanie jednego wzorca w całej aplikacji i używanie ESLint z regułami rxjs/no-ignored-subscription oraz rxjs/no-unsafe-takeuntil do wykrywania potencjalnych problemów na etapie developmentu.
Przykład kodu:
import { Component, OnDestroy, OnInit, DestroyRef, inject } from '@angular/core';
import { Subject, interval, fromEvent } from 'rxjs';
import { takeUntil, take } from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
// PATTERN 1: takeUntil z Subject (tradycyjne podejście)
@Component({
selector: 'app-user-profile',
template: `
<div>
<h2>{{ userName }}</h2>
<p>Kliknięć: {{ clickCount }}</p>
</div>
`
})
export class UserProfileComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();
userName: string;
clickCount = 0;
constructor(private userService: UserService) {}
ngOnInit() {
// Automatyczne zakończenie wszystkich subskrypcji przy zniszczeniu
this.userService.getCurrentUser()
.pipe(takeUntil(this.destroy$))
.subscribe(user => this.userName = user.name);
// Nieskończony strumień wymaga takeUntil
interval(1000)
.pipe(takeUntil(this.destroy$))
.subscribe(val => console.log('Timer:', val));
// Event listener również wymaga czyszczenia
fromEvent(document, 'click')
.pipe(takeUntil(this.destroy$))
.subscribe(() => this.clickCount++);
}
ngOnDestroy() {
// Emitujemy sygnał zakończenia dla wszystkich subskrypcji
this.destroy$.next();
this.destroy$.complete();
}
}
// PATTERN 2: Async pipe (najlepsze rozwiązanie dla szablonów)
@Component({
selector: 'app-user-list',
template: `
<!-- Async pipe automatycznie zarządza subskrypcją -->
<div *ngFor="let user of users$ | async">
{{ user.name }}
</div>
<!-- Działa również z pojedynczymi wartościami -->
<h1>{{ title$ | async }}</h1>
`
})
export class UserListComponent {
// Observable wystawione bezpośrednio - bez ręcznej subskrypcji
users$ = this.userService.getUsers();
title$ = this.userService.getTitle();
constructor(private userService: UserService) {}
// Brak potrzeby ngOnDestroy - async pipe wszystko czyści
}
// PATTERN 3: DestroyRef (Angular 16+, nowoczesne podejście)
@Component({
selector: 'app-notifications',
template: `
<div *ngFor="let notification of notifications">
{{ notification.message }}
</div>
`
})
export class NotificationsComponent implements OnInit {
// Wstrzyknięcie DestroyRef
private destroyRef = inject(DestroyRef);
notifications: any[] = [];
constructor(private notificationService: NotificationService) {}
ngOnInit() {
// takeUntilDestroyed automatycznie używa DestroyRef
this.notificationService.getNotifications()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(notifications => this.notifications = notifications);
// Działa również w injection context (constructor)
interval(5000)
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(() => this.checkNewNotifications());
}
private checkNewNotifications() {
// Logika sprawdzania nowych powiadomień
}
}
// PATTERN 4: Ręczne zarządzanie subskrypcjami (gdy inne nie działają)
@Component({
selector: 'app-dashboard',
template: `<div>Dashboard</div>`
})
export class DashboardComponent implements OnInit, OnDestroy {
private subscriptions = new Subscription();
ngOnInit() {
// Dodawanie wszystkich subskrypcji do jednej głównej
this.subscriptions.add(
interval(1000).subscribe(val => console.log(val))
);
this.subscriptions.add(
fromEvent(window, 'resize').subscribe(() => this.onResize())
);
// Można również używać add() w chain
const sub1 = interval(2000).subscribe();
const sub2 = interval(3000).subscribe();
this.subscriptions.add(sub1).add(sub2);
}
onResize() {
// Logika obsługi resize
}
ngOnDestroy() {
// Anulowanie wszystkich subskrypcji jednocześnie
this.subscriptions.unsubscribe();
}
}
// PRZYKŁAD: Które Observable wymagają czyszczenia?
@Component({
selector: 'app-examples',
template: `<div>Przykłady</div>`
})
export class ExamplesComponent implements OnInit {
private destroy$ = new Subject<void>();
constructor(
private http: HttpClient,
private route: ActivatedRoute
) {}
ngOnInit() {
// ✅ NIE wymaga unsubscribe - kończy się po jednej emisji
this.http.get('/api/users')
.subscribe(users => console.log(users));
// ✅ NIE wymaga unsubscribe - Observable kończy się z take(1)
interval(1000)
.pipe(take(1))
.subscribe(val => console.log(val));
// ❌ WYMAGA unsubscribe - nieskończony strumień
interval(1000)
.pipe(takeUntil(this.destroy$))
.subscribe(val => console.log(val));
// ⚠️ ZALECANE czyszczenie - choć Angular zarządza, lepiej być pewnym
this.route.params
.pipe(takeUntil(this.destroy$))
.subscribe(params => console.log(params));
// ❌ WYMAGA unsubscribe - event listener
fromEvent(document, 'mousemove')
.pipe(takeUntil(this.destroy$))
.subscribe(event => console.log(event));
// ❌ WYMAGA unsubscribe - Subject/BehaviorSubject
const subject$ = new BehaviorSubject(0);
subject$
.pipe(takeUntil(this.destroy$))
.subscribe(val => console.log(val));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
// HELPER: Bazowy komponent z wbudowanym destroy$
export abstract class DestroyableComponent implements OnDestroy {
protected destroy$ = new Subject<void>();
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
// Użycie w komponencie
@Component({
selector: 'app-my-component',
template: `<div>My Component</div>`
})
export class MyComponent extends DestroyableComponent implements OnInit {
ngOnInit() {
// destroy$ dostępny z klasy bazowej
interval(1000)
.pipe(takeUntil(this.destroy$))
.subscribe(val => console.log(val));
}
}
Materiały:
- RxJS Best Practices - Memory Leaks
- Angular Official - Unsubscribing
- takeUntilDestroyed API
- Ben Lesh - RxJS: Don't Unsubscribe
- RxJS ESLint Rules