Artykuł ten jest częścią serii artykułów na temat Programowania reaktywnego.
Zapraszam na GitHub-a.
Tematy
- Wstęp
- Zabawa z czasem - Timer
- Kto za tym stoi? - Scheduler
- Nie zapominaj - Subscribe
- Zabawa z czasem - Interval
- Zabawa z czasem - Buffer
- Zabawa z czasem - Delay
- Zabawa z czasem - Sample
- Zabawa z czasem - Throttle
- Zabawa z czasem - Timestamp/TimeInterval
- Tworzymy dane - Generators
- Tworzymy dane - Własna klasa publikująca
- Marudzimy - Skip
- Marudzimy - Take
- Łap To! - ConsoleKey
- Kombinatorzy - Concat
- Kombinatorzy - Repeat
- Kombinatorzy - Start With
- Kombinatorzy - Ambiguous
- Kombinatorzy - Merge
- Kombinatorzy - Zip
- Kombinatorzy - Switch
- Kombinatorzy - When-And-Then
- Kombinatorzy - Combine Latest
- Transformers - Select
- Transformers - OfType and Cast
- Transformers - Metadata
- Bileciki do kontroli - Unit Tests of Interval
- Bileciki do kontroli - Unit Tests of Observer Interval
- Bileciki do kontroli - Unit Tests of Create Cold/Hot Observable
- Szpryca - AutoFac
Wstęp
Swego czasu był taki film gdzie głównie bohaterowie zamieniają się między sobą swoimi ciałami. W przypadku Rx-ów do czynienia mamy z metodą pozwalając przełączać się pomiędzy strumieniami. Warunkiem przełączenia jest wiek publikowanych danych.
Observable.Switch
Samą implementację rozpoczynam od stworzenia listy generatorów. Lista ta będzie niezbędna w procesie budowania Switch-a. Listę trzeba uprzednio przekonwertować na odpowiedni typ. W tym przypadku będzie to typ: IObservable<IObservable<Randomizer>>
. Do konwertowania wykorzystujemy wbudowaną funkcjonalność w Rx-ach: ToObservable.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var randomedList = new List<IObservable<Randomed>>();
randomedList.Add(ObservableRandomizer.Create(0, 100));
randomedList.Add(ObservableRandomizer.Create(1000, 2000));
randomedList.Add(ObservableRandomizer.Create(0, 100));
for (var i = 0; i < randomedList.Count; i++)
{
randomedList[i].DefaultPrint($"Randomizer{i}");
}
var observableRandomedList = randomedList.ToObservable();
var observableSequence = observableRandomedList.Switch();
var subscription1 = observableSequence.DefaultPrint("Switch");
Po wykonaniu tej operacji możemy dokonać zapisu na nowo powstały strumień np. przy pomocy metody DefaultPrint. Tworzenie Observable.Switch można wykonać w dwojaki sposób:
- Observable.Switch(poszczególne sekwencje oddzielone średnikiem),
- .Switch() ten sposób określamy korzystanie z nowej funkcjonalności bezpośrednio na przekonwertowanej do typu IObservable.
Ale czym jest ten Switch. Otóż odpowiedzialny jest on za wybór strumienia jaki będzie wykorzystany do publikacji wyników pochodzących od dystrybutora powstałego na wskutek działania metody Switch. Jego główne założenie, mówi że publikowane są dane jedynie z najświeższego strumienia jaki został podpięty.
Do projektu z Rx-ami dodałem nową klasę Randomed, która to będzie zawierała informacje generowane przez wątek na jakim uruchomiona jest publikacja.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
namespace RXLib.Randomizer
{
public class Randomed
{
public int Value { get; set; }
public long Ticks { get; set; }
public Randomed()
{
Value = 0;
Ticks = 0;
}
public override string ToString()
{
return $"{Ticks}->{Value}";
}
Dodatkowo stworzyłem klasę o nazwie: ObservableRandomizer zawierającą poniższy wątek jaki odpowiedzialny jest za publikowanie danych dos strumienia.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void StartThread()
{
_threadInProgress = true;
_thread = new Thread(ThreadMethod);
_thread.Start();
}
private void ThreadMethod()
{
while (_threadInProgress)
{
var delay = new Random(DateTime.Now.Millisecond).Next(200, 1000);
Thread.Sleep(delay);
_randomed.Ticks = DateTime.Now.Ticks;
_randomed.Value = new Random(DateTime.Now.Millisecond).Next(_from, _to);
Publish();
}
}
Owe działania odbywają się w wątkach. Dlatego, że aplikacja ma działać samodzielnie. Przygotowując a następnie publikując dane.
Głównym celem tworzenia strumienia opartego o klasę ObservableRandomizer jest instancja dystrybutora losowych danych (z podanego zakresu). By wprowadzić dodatkową losowość czasu publikowania danych na strumień wykorzystałem opóźnienie z losową wartością dla Thread.Sleep.
Zakończenie
W trakcie rozwoju programu dotyczącego Switch. Napisałem dwie klasy pomocnicze, jednak nie są one obecnie wykorzystywane. Pierwsza z nich dodaje do strumienia Timestamp, a następnie go wyświetla.
1
2
3
4
5
6
public static IDisposable PrintWithTimestamp<T>(this IObservable<T> source, string identify)
{
var subscription = source
.Timestamp()
.Subscribe(
item =>
Analogicznie działa druga metoda o nazwie: PrintWithTimeInterval. Jedyna różnica jaka tutaj się znajduje to korzystanie w tym przypadku z TimeInterval
1
2
3
4
5
6
public static IDisposable PrintWithTimeInterval<T>(this IObservable<T> source, string identify)
{
var subscription = source
.TimeInterval()
.Subscribe(
item =>
W obecnym projekcie na chwilę obecną nie owe metody nie są wykorzystywane. Ich implementację napisałem w trakcie pracy nad dzisiejszym aktorem czyli metodą Switch.
Jest to post wchodzący w skład podjętego wyzwania ogłoszonego przez MIROBURN we vlogu z dnia 3 lutego 2018 roku.
Celem wyzwania jest systematyczne działanie w ciągu 30 dni.
Postanowiłem pisać post dziennie o tematyce Programowania Reaktywnego dla platformy .NET.
Wszelkie źródła związane z postami znajdują się na repozytorium GitHub.
Stan obecny wyzwania: 30 z 30 dni.
Referencje:
- MSDN - Getting Started with Rx,
- MSDN - Reactive Extensions,
- 101 Rx Samples,
- ReactiveX,
- Code Project,
- GitHub
Wcześniejszy: Programowanie Reaktywne - Kombinatorzy - Zip
Następny: Programowanie Reaktywne - Kombinatorzy - When-And-Then