Artykuł ten jest częścią serii artykułów na temat Programowania reaktywnego.
Zapraszam na GitHub-a.
Tematy
- Wstęp
 - Zabawa z czasem - Timer
 - Kto za tym stoi? - Scheduler
 - Nie zapominaj - Subscribe
 - Zabawa z czasem - Interval
 - Zabawa z czasem - Buffer
 - Zabawa z czasem - Delay
 - Zabawa z czasem - Sample
 - Zabawa z czasem - Throttle
 - Zabawa z czasem - Timestamp/TimeInterval
 - Tworzymy dane - Generators
 - Tworzymy dane - Własna klasa publikująca
 - Marudzimy - Skip
 - Marudzimy - Take
 - Łap To! - ConsoleKey
 - Kombinatorzy - Concat
 - Kombinatorzy - Repeat
 - Kombinatorzy - Start With
 - Kombinatorzy - Ambiguous
 - Kombinatorzy - Merge
 - Kombinatorzy - Zip
 - Kombinatorzy - Switch
 - Kombinatorzy - When-And-Then
 - Kombinatorzy - Combine Latest
 - Transformers - Select
 - Transformers - OfType and Cast
 - Transformers - Metadata
 - Bileciki do kontroli - Unit Tests of Interval
 - Bileciki do kontroli - Unit Tests of Observer Interval
 - Bileciki do kontroli - Unit Tests of Create Cold/Hot Observable
 - Szpryca - AutoFac
 
Wstęp
Trochę czasu już upłynęło… Warto by było przyswoić pewne dodatkowe informacje procesie zapisu na strumienie.
Przykłady przedstawione do tej pory w postach są bardzo uproszczone:
1
2
3
4
5
6
var observableTimer = Observable.Timer(_dueTime, _period, scheduler);
observableTimer.Subscribe(index =>
{
    Console.WriteLine($"Lambda: {index}");
});
Subskrybowanie
Metoda zapisu przyjmuje (ale nie musi) trzy delegaty:
- OnNext - jest to ciało delegatu wykonywanego w trakcie publikowania przez obiekt obserwowany treści. To tutaj dzieje się nasza zaimplementowana magia,
 - OnError - oczywiście zdarzyć się może, że nie będzie można zapisać się do strumienia, wówczas zostanie wyzwolony delegat z błędem. Warto by było także skorzystać z tej możliwości,
 - OnCompleted - ostatni trywialny delegat, informuje, gdy strumień się zakończy. W przypadku Timer-a. Ta sytuacja nie nastąpi.
 
Zapis do strumienia niesie ze sobą pewne niebezpieczeństwo. Istnieje ryzyko wyłożenia subskrypcji zależne od implementacji zawartej w =>(lambda). Nasz wspaniały idealny kod może rzucić wyjątkiem. Wówczas OnNext przestanie być wyzwalane. Dlatego warto by uchwycić co nieco w ciele delegatu.
Nowa klasa StreamTimer zawiera metodę do zapisu.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void Subscribe(Action<long> onNext)
{
  var newSubscribent = _timerObservable.Subscribe(
    onNext,
    OnError,
    OnCompleted
    );
  _subscribents.Add(newSubscribent);
}
private void OnCompleted()
{
  Console.WriteLine("Completed");
}
private void OnError(Exception ex)
{
  Console.WriteLine(ex);
}
public void Dispose()
{
  _subscribents.ForEach(x => x.Dispose());
}
Dzięki klasie stworzony został timer i w nim domyślnie obsługa OnError i OnCompleted. Czyli już nie ma co się nimi martwić.
Kolejny fragment pokazuje jak zapisywać obserwatorów. Pierwszy przykład od linii:1. Nie będzie powodował problemu dlatego pozwoliłem sobie napisać w najprostszej postaci.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
timer.Subscribe(index =>
{
  Console.WriteLine($"YEAH {index}");
});
timer.Subscribe(index =>
{
  try
  {
    throw new Exception("TRAH!");
  }
  catch (Exception e)
  {
    Console.WriteLine(e);
  }
});
timer.Subscribe(index =>
{
  throw new Exception("TRAH!");
});
Linia:6 zawiera w sobie mechanizm obsługi wyjątków, a to dlatego, że rzucamy jednym w przeciwnym wypadku aplikacja zostałaby zatrzymana. W linii:18 zaczyna się fragment kodu, który spowoduje wywalenie programu. W repozytorium jest on za komentowany tak by program się uruchamiał i działał.
Zakończenie
Warto byłoby wspomnieć także, o sprzątaniu po sobie. Czyli używaniu Dispose. Najpierw zapisujemy zapisanego subskrybenta. Tak by potem na koniec można było ich wszystkich wywalić na zbity r__.
1
2
3
4
5
6
7
  _subscribents.Add(newSubscribent);
}
public void Dispose()
{
  _subscribents.ForEach(x => x.Dispose());
}
W linii: 1 zapisujemy a w 6 niszczymy.
Mam nadzieję, że ponownie przybliżyłem Reactive Extensions:).
Pamiętajcie! Łapcie wyjątki puki gorące!
Jest to post wchodzący w skład podjętego wyzwania ogłoszonego przez MIROBURN we vlogu z dnia 3 lutego 2018 roku.
Celem wyzwania jest systematyczne działanie w ciągu 30 dni.
Postanowiłem pisać post dziennie o tematyce Programowania Reaktywnego dla platformy .NET.
Wszelkie źródła związane z postami znajdują się na repozytorium GitHub.
Stan obecny wyzwania: 30 z 30 dni.
Referencje:
- MSDN - Getting Started with Rx,
 - MSDN - Reactive Extensions,
 - 101 Rx Samples,
 - ReactiveX,
 - Code Project,
 - GitHub
 
Wcześniejszy: Programowanie Reaktywne - Kto za tym stoi? - Scheduler.
Następny: Programowanie Reaktywne - Zabawa z czasem - Interval
    