在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:ReactiveX/RxGo开源软件地址:https://github.com/ReactiveX/RxGo开源编程语言:Go 100.0%开源软件介绍:RxGoReactive Extensions for the Go Language ReactiveXReactiveX, or Rx for short, is an API for programming with Observable streams. This is the official ReactiveX API for the Go language. ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises, and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an Observable. An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available here. RxGoThe RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. Let's see a concrete example with each box being an operator:
In this example, the final items are sent in a channel, available to a consumer. There are many ways to consume or to produce data using RxGo. Publishing the results in a channel is only one of them. Each operator is a transformation stage. By default, everything is sequential. Yet, we can leverage modern CPU architectures by defining multiple instances of the same operator. Each operator instance being a goroutine connected to a common channel. The philosophy of RxGo is to implement the ReactiveX concepts and leverage the main Go primitives (channels, goroutines, etc.) so that the integration between the two worlds is as smooth as possible. Installation of RxGo v2
Getting StartedHello WorldLet's create our first Observable and consume an item: observable := rxgo.Just("Hello, World!")()
ch := observable.Observe()
item := <-ch
fmt.Println(item.V) The By the way, the Once the Observable is created, we can observe it using We consumed an item from this channel and printed its value of the item using An item is a wrapper on top of a value or an error. We may want to check the type first like this: item := <-ch
if item.Error() {
return item.E
}
fmt.Println(item.V)
By default, an Observable is stopped once an error is produced. However, there are special operators to deal with errors (e.g., It is also possible to consume items using callbacks: observable.ForEach(func(v interface{}) {
fmt.Printf("received: %v\n", v)
}, func(err error) {
fmt.Printf("error: %e\n", err)
}, func() {
fmt.Println("observable is closed")
}) In this example, we passed three functions:
<-observable.ForEach(...) Real-World ExampleLet's say we want to implement a stream that consumes the following type Customer struct {
ID int
Name, LastName string
Age int
TaxNumber string
} We create a producer that will emit // Create the input channel
ch := make(chan rxgo.Item)
// Data producer
go producer(ch)
// Create an Observable
observable := rxgo.FromChannel(ch) Then, we need to perform the two following operations:
As the enriching step is IO-bound, it might be interesting to parallelize it within a given pool of goroutines.
Yet, let's imagine that all the observable.
Filter(func(item interface{}) bool {
// Filter operation
customer := item.(Customer)
return customer.Age > 18
}).
Map(func(_ context.Context, item interface{}) (interface{}, error) {
// Enrich operation
customer := item.(Customer)
taxNumber, err := getTaxNumber(customer)
if err != nil {
return nil, err
}
customer.TaxNumber = taxNumber
return customer, nil
},
// Create multiple instances of the map operator
rxgo.WithPool(pool),
// Serialize the items emitted by their Customer.ID
rxgo.Serialize(func(item interface{}) int {
customer := item.(Customer)
return customer.ID
}), rxgo.WithBufferedChannel(1)) In the end, we consume the items using for customer := range observable.Observe() {
if customer.Error() {
return err
}
fmt.Println(customer)
} Observable TypesHot vs. Cold ObservablesIn the Rx world, there is a distinction between cold and hot Observables. When the data is produced by the Observable itself, it is a cold Observable. When the data is produced outside the Observable, it is a hot Observable. Usually, when we don't want to create a producer over and over again, we favour a hot Observable. In RxGo, there is a similar concept. First, let's create a hot Observable using ch := make(chan rxgo.Item)
go func() {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
// First Observer
for item := range observable.Observe() {
fmt.Println(item.V)
}
// Second Observer
for item := range observable.Observe() {
fmt.Println(item.V)
} The result of this execution is:
It means the first Observer already consumed all items. And nothing left for others. On the other hand, let's create a cold Observable using observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
}})
// First Observer
for item := range observable.Observe() {
fmt.Println(item.V)
}
// Second Observer
for item := range observable.Observe() {
fmt.Println(item.V)
} Now, the result is: 0
1
2
0
1
2 In the case of a cold observable, the stream was created independently for every Observer. Again, hot vs cold Observables are not about how you consume items, it's about where data is produced. BackpressureThere is another operator called A use case with Once we start observing an Observable created with observable := rxgo.FromEventSource(input, rxgo.WithBackPressureStrategy(rxgo.Drop)) The By default, a channel connecting operators is non-buffered. We can override this behaviour like this: observable.Map(transform, rxgo.WithBufferedChannel(42)) Each operator has an Lazy vs. Eager ObservationThe default observation strategy is lazy. It means an operator processes the items emitted by an Observable once we start observing it. We can change this behaviour this way: observable := rxgo.FromChannel(ch).Map(transform, rxgo.WithObservationStrategy(rxgo.Eager)) In this case, the Sequential vs. Parallel OperatorsBy default, each operator is sequential. One operator being one goroutine instance. We can override it using the following option: observable.Map(transform, rxgo.WithPool(32)) In this example, we create a pool of 32 goroutines that consume items concurrently from the same channel. If the operation is CPU-bound, we can use the Connectable ObservableA Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way, you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items. Let's create a Connectable Observable using ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy()) Then, we create two Observers: observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int) + 1, nil
}).DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int) * 2, nil
}).DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
}) If observable.Connect() Once There is another important change with a regular Observable. A Connectable Observable publishes its items. It means all the Observers receive a copy of the items. Here is an example with a regular Observable: ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
}()
// Create a regular Observable
observable := rxgo.FromChannel(ch)
// Create the first Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
// Create the second Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
Now, with a Connectable Observable: ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
}()
// Create a Connectable Observable
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
// Create the first Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
// Create the second Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
disposed, cancel := observable.Connect()
go func() {
// Do something
time.Sleep(time.Second)
// Then cancel the subscription
cancel()
}()
// Wait for the subscription to be disposed
<-disposed
Observable, Single, and Optional SingleAn Iterable is an object that can be observed using An Iterable can be either:
DocumentationPackage documentation: https://pkg.go.dev/github.com/reactivex/rxgo/v2 Assert APIHow to use the assert API to write unit tests while using RxGo. Operator OptionsCreating Observables
Transforming Observables
Filtering Observables
Combining Observables
Error Handling Operators
Observable Utility Operators
Conditional and Boolean Operators
Mathematical and Aggregate Operators
|
请发表评论