Programowanie Reaktywne - Nie zapominaj - Subscribe.

06.02.2018


Artykuł ten jest częścią serii artykułów na temat Programowania reaktywnego.

Zapraszam na GitHub-a.

Tematy

  1. Wstęp
  2. Zabawa z czasem - Timer
  3. Kto za tym stoi? - Scheduler
  4. Nie zapominaj - Subscribe
  5. Zabawa z czasem - Interval
  6. Zabawa z czasem - Buffer
  7. Zabawa z czasem - Delay
  8. Zabawa z czasem - Sample
  9. Zabawa z czasem - Throttle
  10. Zabawa z czasem - Timestamp/TimeInterval
  11. Tworzymy dane - Generators
  12. Tworzymy dane - Własna klasa publikująca
  13. Marudzimy - Skip
  14. Marudzimy - Take
  15. Łap To! - ConsoleKey
  16. Kombinatorzy - Concat
  17. Kombinatorzy - Repeat
  18. Kombinatorzy - Start With
  19. Kombinatorzy - Ambiguous
  20. Kombinatorzy - Merge
  21. Kombinatorzy - Zip
  22. Kombinatorzy - Switch
  23. Kombinatorzy - When-And-Then
  24. Kombinatorzy - Combine Latest
  25. Transformers - Select
  26. Transformers - OfType and Cast
  27. Transformers - Metadata
  28. Bileciki do kontroli - Unit Tests of Interval
  29. Bileciki do kontroli - Unit Tests of Observer Interval
  30. Bileciki do kontroli - Unit Tests of Create Cold/Hot Observable
  31. Szpryca - AutoFac

Wstęp

Trochę czasu już upłynęło… Warto by było przyswoić pewne dodatkowe informacje procesie zapisu na strumienie.

Przykłady przedstawione do tej pory w postach są bardzo uproszczone:

1
2
3
4
5
6
var observableTimer = Observable.Timer(_dueTime, _period, scheduler);

observableTimer.Subscribe(index =>
{
    Console.WriteLine($"Lambda: {index}");
});

Subskrybowanie

Reactive Extensions - Subscribe Metoda zapisu przyjmuje (ale nie musi) trzy delegaty:

  • OnNext - jest to ciało delegatu wykonywanego w trakcie publikowania przez obiekt obserwowany treści. To tutaj dzieje się nasza zaimplementowana magia,
  • OnError - oczywiście zdarzyć się może, że nie będzie można zapisać się do strumienia, wówczas zostanie wyzwolony delegat z błędem. Warto by było także skorzystać z tej możliwości,
  • OnCompleted - ostatni trywialny delegat, informuje, gdy strumień się zakończy. W przypadku Timer-a. Ta sytuacja nie nastąpi.

Zapis do strumienia niesie ze sobą pewne niebezpieczeństwo. Istnieje ryzyko wyłożenia subskrypcji zależne od implementacji zawartej w =>(lambda). Nasz wspaniały idealny kod może rzucić wyjątkiem. Wówczas OnNext przestanie być wyzwalane. Dlatego warto by uchwycić co nieco w ciele delegatu.

Nowa klasa StreamTimer zawiera metodę do zapisu.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void Subscribe(Action<long> onNext)
{
  var newSubscribent = _timerObservable.Subscribe(
    onNext,
    OnError,
    OnCompleted
    );

  _subscribents.Add(newSubscribent);
}

private void OnCompleted()
{
  Console.WriteLine("Completed");
}

private void OnError(Exception ex)
{
  Console.WriteLine(ex);
}

public void Dispose()
{
  _subscribents.ForEach(x => x.Dispose());
}

Dzięki klasie stworzony został timer i w nim domyślnie obsługa OnError i OnCompleted. Czyli już nie ma co się nimi martwić.

Kolejny fragment pokazuje jak zapisywać obserwatorów. Pierwszy przykład od linii:1. Nie będzie powodował problemu dlatego pozwoliłem sobie napisać w najprostszej postaci.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
timer.Subscribe(index =>
{
  Console.WriteLine($"YEAH {index}");
});

timer.Subscribe(index =>
{
  try
  {
    throw new Exception("TRAH!");
  }
  catch (Exception e)
  {
    Console.WriteLine(e);
  }
});

timer.Subscribe(index =>
{
  throw new Exception("TRAH!");
});

Reactive Extensions - Subscribe

Linia:6 zawiera w sobie mechanizm obsługi wyjątków, a to dlatego, że rzucamy jednym w przeciwnym wypadku aplikacja zostałaby zatrzymana. W linii:18 zaczyna się fragment kodu, który spowoduje wywalenie programu. W repozytorium jest on za komentowany tak by program się uruchamiał i działał.

Zakończenie

Warto byłoby wspomnieć także, o sprzątaniu po sobie. Czyli używaniu Dispose. Najpierw zapisujemy zapisanego subskrybenta. Tak by potem na koniec można było ich wszystkich wywalić na zbity r__.

1
2
3
4
5
6
7
  _subscribents.Add(newSubscribent);
}

public void Dispose()
{
  _subscribents.ForEach(x => x.Dispose());
}

W linii: 1 zapisujemy a w 6 niszczymy.

Mam nadzieję, że ponownie przybliżyłem Reactive Extensions:).

Pamiętajcie! Łapcie wyjątki puki gorące!


Jest to post wchodzący w skład podjętego wyzwania ogłoszonego przez MIROBURN we vlogu z dnia 3 lutego 2018 roku.

Celem wyzwania jest systematyczne działanie w ciągu 30 dni.

Postanowiłem pisać post dziennie o tematyce Programowania Reaktywnego dla platformy .NET.

Wszelkie źródła związane z postami znajdują się na repozytorium GitHub.

Stan obecny wyzwania: 30 z 30 dni.


Referencje:


Wcześniejszy: Programowanie Reaktywne - Kto za tym stoi? - Scheduler.

Następny: Programowanie Reaktywne - Zabawa z czasem - Interval


Zapisz się na listę :)