2.10可观察对象 (Observables) 与 RxJS


文档摘要

2.10可观察对象 (Observables) 与 RxJS 2.10 可观察对象 (Observables) 与 RxJS 在 Angular 应用中,可观察对象 (Observables) 是处理异步操作和事件的关键组成部分。它们提供了一种强大的机制来管理随时间推移的数据流,并允许我们使用 RxJS (Reactive Extensions for JavaScript) 提供的丰富的操作符来转换、过滤和组合这些数据流。 2.10.1 什么是可观察对象 (Observables)? 可观察对象可以被认为是一个随时间推移发出多个值的流。与 Promise 不同,Promise 只发出一个值,可观察对象可以发出零个、一个或多个值,并且可以最终完成或抛出错误。

## 2.10可观察对象 (Observables) 与 RxJS ## 2.10 可观察对象 (Observables) 与 RxJS 在 Angular 应用中,可观察对象 (Observables) 是处理异步操作和事件的关键组成部分。它们提供了一种强大的机制来管理随时间推移的数据流,并允许我们使用 RxJS (Reactive Extensions for JavaScript) 提供的丰富的操作符来转换、过滤和组合这些数据流。 ### 2.10.1 什么是可观察对象 (Observables)? 可观察对象可以被认为是一个随时间推移发出多个值的流。与 Promise 不同,Promise 只发出一个值,可观察对象可以发出零个、一个或多个值,并且可以最终完成或抛出错误。 **核心概念:** * **Observable (可观察对象):** 定义一个数据流,它可以发出零个或多个值,并最终完成或抛出错误。 * **Observer (观察者):** 订阅可观察对象,并接收它发出的值、完成通知或错误通知。 * **Subscription (订阅):** 表示观察者与可观察对象之间的连接。通过取消订阅,可以停止接收值。 * **Operators (操作符):** 用于转换、过滤和组合可观察对象发出的值。 * **Subject (主题):** 一种特殊类型的可观察对象,可以同时作为可观察对象和观察者。它可以多播值给多个订阅者。 **Graph TD 图解:** ```mermaid graph TD A[Observable] -->|subscribe| B(Observer) B -- onNext --> C{Value} B -- onError --> D[Error] B -- onComplete --> E[Complete] C --> A ``` ### 2.10.2 为什么使用可观察对象 (Observables)? * **处理异步操作:** 可观察对象非常适合处理 HTTP 请求、用户输入、定时器等异步操作。 * **处理事件流:** 它们可以用来处理 DOM 事件、WebSocket 消息等事件流。 * **响应式编程:** 可观察对象是响应式编程的核心,它允许我们以声明式的方式处理数据流。 * **强大的操作符:** RxJS 提供了大量的操作符,可以方便地转换、过滤和组合数据流。 * **取消订阅:** 可以通过取消订阅来停止接收值,避免内存泄漏。 ### 2.10.3 创建可观察对象 (Observables) 可以使用 `new Observable()` 构造函数或 RxJS 提供的创建操作符来创建可观察对象。 **使用 `new Observable()`:** ```typescript import { Observable } from 'rxjs'; const myObservable = new Observable((observer) => { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); }); myObservable.subscribe({ next: (value) => console.log('Value:', value), error: (error) => console.error('Error:', error), complete: () => console.log('Completed'), }); ``` **使用创建操作符 (例如 `of`, `from`, `interval`):** ```typescript import { of, from, interval } from 'rxjs'; // of: 发出指定的值 const numbers$ = of(1, 2, 3, 4, 5); numbers$.subscribe((value) => console.log('of Value:', value)); // from: 从数组、Promise 或迭代器创建可观察对象 const myArray = [10, 20, 30]; const myArray$ = from(myArray); myArray$.subscribe((value) => console.log('from Value:', value)); // interval: 每隔一段时间发出一个递增的数字 const timer$ = interval(1000); // 每秒发出一个数字 const subscription = timer$.subscribe((value) => console.log('interval Value:', value)); // 停止发出值 setTimeout(() => { subscription.unsubscribe(); console.log('interval Unsubscribed'); }, 5000); ``` ### 2.10.4 订阅可观察对象 (Subscribing to Observables) 使用 `subscribe()` 方法订阅可观察对象。`subscribe()` 方法接收一个观察者对象,该对象包含 `next`、`error` 和 `complete` 三个回调函数。 ```typescript import { Observable } from 'rxjs'; const myObservable = new Observable((observer) => { observer.next('Hello'); observer.next('World'); observer.complete(); }); const mySubscription = myObservable.subscribe({ next: (value) => console.log('Received:', value), error: (error) => console.error('Error:', error), complete: () => console.log('Observable completed'), }); // 取消订阅 // mySubscription.unsubscribe(); ``` ### 2.10.5 RxJS 操作符 (Operators) RxJS 提供了大量的操作符,可以用于转换、过滤和组合可观察对象发出的值。 **常用的操作符:** * **`map`:** 将可观察对象发出的每个值转换为另一个值。 * **`filter`:** 过滤可观察对象发出的值,只发出满足条件的值。 * **`reduce`:** 将可观察对象发出的所有值聚合成一个值。 * **`scan`:** 类似于 `reduce`,但它会发出每个中间聚合值。 * **`tap`:** 在不改变值的情况下执行副作用操作 (例如,记录日志)。 * **`take`:** 只发出指定数量的值。 * **`debounceTime`:** 在指定的时间内没有发出新的值时,才发出最后一个值 (常用于处理用户输入)。 * **`mergeMap` (aka `flatMap`):** 将可观察对象发出的每个值转换为另一个可观察对象,并将这些可观察对象合并为一个可观察对象。 * **`switchMap`:** 将可观察对象发出的每个值转换为另一个可观察对象,并取消订阅前一个可观察对象,只发出最后一个可观察对象的值 (常用于处理搜索)。 * **`concatMap`:** 将可观察对象发出的每个值转换为另一个可观察对象,并按顺序发出这些可观察对象的值。 * **`forkJoin`:** 等待所有输入的可观察对象完成,然后发出一个包含每个可观察对象最后一个值的数组。 **代码示例:** ```typescript import { of } from 'rxjs'; import { map, filter, tap, take } from 'rxjs/operators'; const numbers$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); numbers$ .pipe( filter((value) => value % 2 === 0), // 过滤出偶数 map((value) => value * 2), // 将偶数乘以 2 tap((value) => console.log('Processed:', value)), // 记录处理后的值 take(3) // 只取前 3 个值 ) .subscribe({ next: (value) => console.log('Result:', value), complete: () => console.log('Finished'), }); ``` **Graph TD 图解 `mergeMap` vs `switchMap`:** ```mermaid graph TD A[Source Observable] --> B{mergeMap} A --> C{switchMap} B --> D[Inner Observable 1] B --> E[Inner Observable 2] C --> F[Inner Observable 1] C --> G[Inner Observable 2] F -->|cancel previous| G D --> H[Output Observable (concurrent)] E --> H G --> I[Output Observable (latest)] ``` ### 2.10.6 Subject Subject 是一种特殊类型的可观察对象,可以同时作为可观察对象和观察者。它可以多播值给多个订阅者。 **常用的 Subject 类型:** * **`Subject`:** 基本的 Subject 类型,没有特殊行为。 * **`BehaviorSubject`:** 在订阅时立即发出最后一个值。 * **`ReplaySubject`:** 缓存指定数量的值,并在订阅时重放这些值。 * **`AsyncSubject`:** 只在可观察对象完成时发出最后一个值。 **代码示例:** ```typescript import { Subject } from 'rxjs'; const mySubject = new Subject(); mySubject.subscribe({ next: (value) => console.log('Observer 1:', value), }); mySubject.subscribe({ next: (value) => console.log('Observer 2:', value), }); mySubject.next(1); mySubject.next(2); mySubject.next(3); ``` ### 2.10.7 在 Angular 中使用可观察对象 (Observables) 在 Angular 中,可观察对象广泛用于处理 HTTP 请求、表单事件、路由事件等。 **HTTP 请求:** `HttpClient` 服务返回可观察对象。 ```typescript import { HttpClient } from '@angular/common/http'; import { Injectable } from '@angular/core'; import { Observable } from 'rxjs'; @Injectable({ providedIn: 'root', }) export class DataService { private apiUrl = 'https://jsonplaceholder.typicode.com/todos/1'; constructor(private http: HttpClient) {} getData(): Observable { return this.http.get(this.apiUrl); } } ``` **在组件中使用:** ```typescript import { Component, OnInit } from '@angular/core'; import { DataService } from './data.service'; @Component({ selector: 'app-my-component', template: `

{{ data | json }}

`, }) export class MyComponent implements OnInit { data: any; constructor(private dataService: DataService) {} ngOnInit(): void { this.dataService.getData().subscribe({ next: (data) => { this.data = data; }, error: (error) => { console.error('Error fetching data:', error); }, }); } } ``` **使用 `async` 管道简化模板绑定:** ```typescript import { Component } from '@angular/core'; import { DataService } from './data.service'; import { Observable } from 'rxjs'; @Component({ selector: 'app-my-component', template: `

{{ data$ | async | json }}

`, }) export class MyComponent { data$: Observable; constructor(private dataService: DataService) { this.data$ = this.dataService.getData(); } } ``` ### 2.10.8 错误处理 处理可观察对象中的错误非常重要。可以使用 `catchError` 操作符来捕获错误并返回一个新的可观察对象。 ```typescript import { HttpClient } from '@angular/common/http'; import { Injectable } from '@angular/core'; import { Observable, throwError } from 'rxjs'; import { catchError } from 'rxjs/operators'; @Injectable({ providedIn: 'root', }) export class DataService { private apiUrl = 'https://jsonplaceholder.typicode.com/todos/invalid-url'; // 故意设置一个无效的 URL constructor(private http: HttpClient) {} getData(): Observable { return this.http.get(this.apiUrl).pipe( catchError((error) => { console.error('Error occurred:', error); // 可以返回一个默认值,或者重新抛出错误 return throwError(() => new Error('Something bad happened; please try again later.')); }) ); } } ``` ### 2.10.9 取消订阅 (Unsubscribing) 取消订阅可观察对象非常重要,以避免内存泄漏。 在组件销毁时,应该取消订阅所有可观察对象。 **在组件中使用 `ngOnDestroy`:** ```typescript import { Component, OnInit, OnDestroy } from '@angular/core'; import { DataService } from './data.service'; import { Subscription } from 'rxjs'; @Component({ selector: 'app-my-component', template: `

{{ data | json }}

`, }) export class MyComponent implements OnInit, OnDestroy { data: any; private subscription: Subscription; constructor(private dataService: DataService) {} ngOnInit(): void { this.subscription = this.dataService.getData().subscribe({ next: (data) => { this.data = data; }, error: (error) => { console.error('Error fetching data:', error); }, }); } ngOnDestroy(): void { // 取消订阅以防止内存泄漏 if (this.subscription) { this.subscription.unsubscribe(); } } } ``` **使用 `takeUntil` 操作符:** `takeUntil` 操作符允许我们在另一个可观察对象发出值时自动取消订阅。 ```typescript import { Component, OnInit, OnDestroy } from '@angular/core'; import { DataService } from './data.service'; import { Subject, takeUntil } from 'rxjs'; @Component({ selector: 'app-my-component', template: `

{{ data | json }}

`, }) export class MyComponent implements OnInit, OnDestroy { data: any; private destroy$ = new Subject(); constructor(private dataService: DataService) {} ngOnInit(): void { this.dataService .getData() .pipe(takeUntil(this.destroy$)) // 当 destroy$ 发出值时,自动取消订阅 .subscribe({ next: (data) => { this.data = data; }, error: (error) => { console.error('Error fetching data:', error); }, }); } ngOnDestroy(): void { this.destroy$.next(); this.destroy$.complete(); } } ``` ### 2.10.10 总结 可观察对象和 RxJS 是 Angular 中处理异步操作和事件流的强大工具。 掌握这些概念对于构建高效和响应式的 Angular 应用至关重要。 通过使用 RxJS 提供的丰富操作符,我们可以以声明式的方式转换、过滤和组合数据流,从而简化代码并提高可维护性。 记住始终取消订阅可观察对象以避免内存泄漏。

发布者: 作者: 转发
评论区 (0)
U