יש פונקציות שמחזירות observable, כמו הפונקציה fromEvent שראינו במאמר הקודם על יסודות rxjs. אפשר גם ליצור observable ישירות.
ניצור אובייקט observable ונעביר פונקציה ל-constructor שלו. הפונקציה מקבלת משתנה: subscriber, שמקביל ל-observer שראינו, מה שלוקח את הערך שיוצא מה-pipeline ועושה איתו משהו.
const{ Observable } = Rx;
const observable = new Observable((subscriber) => {
// Calling next means throw the value 1 into our pipeline.
subscriber.next(1);
// Calling complete means we stop emitting events for this observable.
subscriber.complete();
// Calling error anytime that something went wrong in out code.
subscriber.error(new Error('Some error text'));
});
observable.subscribe({
next(value){
console.log('Got the value', value);
},
complete(){
console.log('Completed, no more values');
},
error(err){
console.log('Something went wrong', err.message);
}
});
// Only for the test tool
observable;
ככה תראה ההפעלה של הקוד.
בדוגמא פה מכיוון שהפעלנו את complete לפני error, לא נגיע לפעולה של ה-error.
יש דרך נוספת שבה קוראים לפונקציות של ה-observable.
observable.subscribe(
(value) => console.log('Got the value', value),
(error) => console.log('Something went wrong', error.message),
() => console.log('Completed, no more values')
);
לא תמיד יהיה לנו טיפול בשגיאות והרבה פעמים לא נראה טיפול ב-complete.
Unicast Observable
נעשה שינויים בתוכנית ממקודם:
- נשלח עוד ערכים עם פונקציית next.
- נשתמש ב-tap operator. מה שהוא עושה זה לשלוח לקונסול כשיש לנו ערך חדש. נפתח pipe ונשתמש שם ב-tap.
const{ Observable } = Rx;
const { tap } = RxOperators;
const observable = new Observable((subscriber) => {
// Calling next means throw the value 1 into our pipeline.
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
// Calling complete means we stop emitting events for this observable.
subscriber.complete();
// Calling error anytime that something went wrong in out code.
subscriber.error(new Error('Some error text'));
}).pipe(
tap(value => console.log('from tap:', value))
);
observable.subscribe(
(value) => console.log('Got the value', value),
(error) => console.log('Something went wrong', error.message),
() => console.log('Completed, no more values')
);
התוצאה שנראה בקונסול היא:
from tap: 1
Got the value 1
from tap: 2
Got the value 2
from tap: 3
Got the value 3
Completed, no more values
עכשיו נוסיף עוד observable.
observable.subscribe(
(value) => console.log('Got the value', value),
(error) => console.log('Something went wrong', error.message),
() => console.log('Completed, no more values')
);
observable.subscribe(
(value) => console.log('From second subscribe', value),
);
וזה מה שנקבל:
from tap: 1
Got the value 1
from tap: 2
Got the value 2
from tap: 3
Got the value 3
Completed, no more values
from tap: 1
From second subscribe 1
from tap: 2
From second subscribe 2
from tap: 3
From second subscribe 3
יש לנו 2 סטים של הדפסות. מה שמוזר זה שאנחנו רואים את ההדפסה של tap פעמיים.
המידע יוצא מתוך ה-observable, הוא מפעיל את הטריגר של הפעולה. כל מספר עובר דרך ה-tap ומודפס ואז עובר לתוך ה-observer. למה זה לא קורה פעם אחת ועובר לשניהם?
ברגע שאנחנו מפעילים את subscribe, ה-observable שלנו מופעל. הערכים יוצאים ועוברים ל-tap ומודפסים ועוברים לפעול ב-observable.
ואז יש לנו subscribe של ה-observable השני. ושוב הפונקציות מופעלות ו-tap עובד שוב ושולח את הערכים ל-observer.
זה נקרא Unicast observer, יש לנו סט ערכים נפרד לכל observer שנרשם.
יכולה להיות בעיה אם פונים למשאבים עם המידע שיוצא, וכל פעם כשיש לנו subscribe נקבל קריאה נוספת להביא מידע, למרות שיתכן שכבר יש לנו אותו.
Multicast Observable
אנחנורוצים ליצור מצב שבו כשיש טריגר ל-observable שלנו, כל מי ש-subscribe יקבל המידע המשותף.
צריך לדעת שאם קוראים ל-error או complete ואז יש subscribe נוסף, הפעולה של ה-observab;e תתחיל מחדש.
כדי להפוך observable ל-multicast נשתמש באופרטור share.
const{ Observable } = Rx;
const { tap, share } = RxOperators;
const observable = new Observable((subscriber) => {
// Calling next means throw the value 1 into our pipeline.
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
}).pipe(
tap(value => console.log('from tap:', value)),
share()
);
observable.subscribe(
(value) => console.log('Got the value', value),
(error) => console.log('Something went wrong', error.message),
() => console.log('Completed, no more values')
);
observable.subscribe(
(value) => console.log('From second subscribe', value),
);
במקרה של הקוד שלפנינו, זאת תהיה ההדפסה:
from tap: 1
Got the value 1
from tap: 2
Got the value 2
from tap: 3
Got the value 3
ה-observer הראשון גרם לטריגר של האירועים, והשני, כשהוא עשה subscribe והלך לראות מה האירועים שמתרחשים, כבר לא היו אירועים. הם כבר הופעלו קודם. אן יהיה אירוע נוסף שיופעל אחרי ששני ה-observers ביצעו subscribe, שניהם יקלטו אותו.
אם נוסיף subscriber.complete נקבל איפוס של הקריאה ושני ה-observables יקבלו את המידע.
const observable = new Observable((subscriber) => {
// Calling next means throw the value 1 into our pipeline.
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
}).pipe(
tap(value => console.log('from tap:', value)),
share()
);
Hot vs. Cold Observables
הגדרה של Hot observable אומרת שיש לנו זרם של אירועים משותפים לכל הנרשמים של observable. חדשים וישנים. דומה לרעיון של multicast.
הגדרה של Cold Observable אומרת שלכל subscriber יש נו יצירה מחדש של זרם האירועים. דומה לרעיון של unicast.