Observables

מה זה Observables?

אנחנו משתמשים ב-Observables לבצע פעולות א-סינכרוניות ולטפל במידע א-סינכרוני. עוד דרך לטפל במידע א-סינכרוני ב-JS הוא על ידי שימוש ב-Promises.

הקוד ב-JS מתבצע שורה אחרי שורה. כששורה אחת מסיימת את הפעולה שלה, נקראת השורה הבאה. אם משימה לוקחת הרבה זמן, למשל יש לנו הבאת נתונים ממסד נתונים, השורה הבאה תצטרך להמתין הרבה זמן. הקוד הסינכרוני חוסם את התוכנית.

קוד א-סינכרוני פועל ברקע בלי לחסום את התוכנית. אנחנו יכולים לבצע קריאת HTTP שתעבוד ברקע והקוד ימשיך לעבוד. הקריאה לא תחסום אותו.

בשימוש ב-Promise, ה-Promise יחכה שכל המידע יאסף ואז ישלח אותו. בשימוש ב-Observable הוא ישלח את המידע ברצף של חלקים, streaming.

הבדך נוסף הוא ש-Promise יביא את המידע בכל מקרה, גם אם אף אחד לש משתמש בו. Observable יביא את המידע רק אם יש מישהו שמשתמש בו.

Observable הוא פונקציה שממירה מידע רגיל לרצף של מידע (stream).

ה-Observer

ל-Rxjs יש שני שחקנים עיקריים. ה-Observables שזה רצף המידע וה-Observer שהולך להשתמש במידע הזה. כדי שה-Observer יוכל לקבל את המידע הוא צריך להרשם Subscribe ל-Observable.

יצירת Observable

על מנת לראות את Rxjs בעבודה, ניצור קומפוננטה חדשה:

ng g c components/rxjs-learning

כדי להשתמש בפונקציות של Rxjs צריך לייבא את החלקים שמשתמשים בהם מהספרייה.

נגדיר משתנה Observable בשם myObservable. ל-constructor של ה-Observable שיצרנו צריך להעביר פונקציית callback שתקבל את ה-Observer. ה-Observer הוא ה-subscriber שמחכה למידע.

myObservable = new Observable((observer) => {
    console.log("Observable starts");
});

עכשיו נשלח מידע על ידי קריאה לפונקציה next.

ה-observer יטריג את המידע רק אם יש subscriber.

myObservable = new Observable((observer) => {
    console.log("Observable starts");
    observer.next("1");
    observer.next("2");
    observer.next("3");
    observer.next("4");
    observer.next("5");
});

ngOnInit(){
    this.myObservable.subscribe();
}

Subscribe

לפונקציה subscribe יש 3 פרמטרים אופציונלים. שלושתם פונקציות callback. הפונקציות הן next, error, complete.

הפונקציה next נקראת בכל פעם שה-Observable יחזיר ערך. בדוגמא, הפונקציה next תקרא 5 פעמים בגלל שה-Observable מטריג 5 פעמים מידע.

myObservable = new Observable((observer) => {
    console.log("Observable starts");
    observer.next("1");
    observer.next("2");
    observer.next("3");
    observer.next("4");
    observer.next("5");
});

ngOnInit(){
    this.myObservable.subscribe((result) => {
      console.log(result);
    });
}

נראה את הזרימה של הנתונים לאורך זמן על ידי שימוש בטיימר.

myObservable = new Observable((observer) => {
    console.log("Observable starts");
    setTimeout(() => {observer.next("1")}, 1000);
    setTimeout(() => {observer.next("2")}, 2000);
    setTimeout(() => {observer.next("3")}, 3000);
    setTimeout(() => {observer.next("4")}, 4000);
    setTimeout(() => {observer.next("5")}, 5000);
});

ngOnInit(){
    this.myObservable.subscribe((result) => {
      console.log(result);
    });
}

Error & Completion of Observable

כמו ש-Observable יוצר אירוע כשזורם אליו מידע, הוא יכול גם ליצור אירוע שגיאה. אירוע נוסף שהוא יוצר זה כשפעולת הזרמת המידע הושלמה.

כדי לקבל אירוע שגיאה מ-Observable נשתמש בפונקציית error.

אנחנו לא נמשיך לקבל מידע אחרי שה-Observable הטריג אירוע שגיאה.

myObservable = new Observable((observer) => {
    console.log("Observable starts");
    setTimeout(() => {observer.next("1")}, 1000);
    setTimeout(() => {observer.next("2")}, 2000);
    setTimeout(() => {observer.next("3")}, 3000);
    setTimeout(() => {observer.error(new Error('Something is wrong!'))}, 3000);
    setTimeout(() => {observer.next("4")}, 4000);
    setTimeout(() => {observer.next("5")}, 5000);
});

השגיאה לא אומרת כלום למשתמש, אנחנו צריכים לטפל בה באמצעות הפונקציה השנייה של ה-Observer.

ngOnInit(){
    this.myObservable.subscribe(
    result => {
      console.log(result);
    }, error => {
      alert(error.message);
    });
}

עוד אירוע של ה-Observable הוא ה-complete.

myObservable = new Observable((observer) => {
    console.log("Observable starts");
    setTimeout(() => {observer.next("1")}, 1000);
    setTimeout(() => {observer.next("2")}, 2000);
    setTimeout(() => {observer.next("3")}, 3000);
    setTimeout(() => {observer.next("4")}, 4000);
    setTimeout(() => {observer.next("5")}, 5000);
    setTimeout(() => {observer.complete()}, 5000);
});

הטיפול בפעולת ה-complete יהיה בפונקציית ה-callback השלישית של ה-subscriber.

ngOnInit(){
    this.myObservable.subscribe(
    result => {
      console.log(result);
    }, error => {
      console.log(error.message);
    }, () => {
      console.log("Observable completed!");
    });
}

לא יתקבל מידע נוסף אחרי פעולת ה-complete. אם נשלחה הודעת error לפני ה-complete, התוכנית לא תבצע את הפעילות של ה-complete.

יצירת Observable עם פונקציית of

אפשר ליצור Observable מתוך רשימת ערכים על ידי הפונקציה of.

export class RxjsLearningComponent {
  array1 = [1, 3, 5, 7, 9];
  array2 = ["a", "b", "c", "d"];

  myObservable2 = of(this.array1, this.array2);

  ngOnInit(){
    this.myObservable2.subscribe({
      next: result => {
        console.log(result);
      },
      error: error => {
        console.log(error.message);
      },
      complete: () => {
        console.log("Observable2 completed!");
      }
    });
  }

האופרטור of יפעיל את פעולת ה-complete אחרי שהוא יטריג את כל הערכים שהוא קבל. אם שלחנו ערך כמערך, הוא עובר כמערך ל-observable.

יצירת Observable עם פונקציית from

אפשר ליצור observable עם הפונקציה from. כדי ליצור את ה-observable נעביר אובייקט איטרטיבי כמו מערך או מחרוזת.

export class RxjsLearningComponent {
  array1 = [1, 3, 5, 7, 9];

  myObservable3 = from(this.array1);

  ngOnInit(){
    this.myObservable3.subscribe({
      next: result => {
        console.log(result);
      },
      error: error => {
        console.log(error.message);
      },
      complete: () => {
        console.log("Observable2 completed!");
      }
    });
  }

בשונה מ-of כאן המערך לא נכנס בבת אחת כמערך, אלא הערכים נכנסים אחד אחד. האופרטור from מקבל ערך אחד בלבד. from יכול גם להפוך promise ל-observable.