English 中文(简体)
RxPY - Working with Observables
  • 时间:2024-11-03

RxPY - Working With Observables


Previous Page Next Page  

An observable, is a function that creates an observer and attaches it to the source where values are expected, for example, cpcks, mouse events from a dom element, etc.

The topics mentioned below will be studied in detail in this chapter.

    Create Observables

    Subscribe and Execute an Observable

Create observables

To create an observable we will use create() method and pass the function to it that has the following items.

    on_next() − This function gets called when the Observable emits an item.

    on_completed() − This function gets called when the Observable is complete.

    on_error() − This function gets called when an error occurs on the Observable.

To work with create() method first import the method as shown below −


from rx import create

Here is a working example, to create an observable −

testrx.py


from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Subscribe and Execute an Observable

To subscribe to an observable, we need to use subscribe() function and pass the callback function on_next, on_error and on_completed.

Here is a working example −

testrx.py


from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

The subscribe() method takes care of executing the observable. The callback function on_next, on_error and on_completed has to be passed to the subscribe method. Call to subscribe method, in turn, executes the test_observable() function.

It is not mandatory to pass all three callback functions to the subscribe() method. You can pass as per your requirements the on_next(), on_error() and on_completed().

The lambda function is used for on_next, on_error and on_completed. It will take in the arguments and execute the expression given.

Here is the output, of the observable created −


E:pyrx>python testrx.py
Got - Hello
Job Done!
Advertisements