Artykuł ten jest częścią serii artykułów na temat Programowania reaktywnego.
Zapraszam na GitHub-a.
Tematy
- Wstęp
- Zabawa z czasem - Timer
- Kto za tym stoi? - Scheduler
- Nie zapominaj - Subscribe
- Zabawa z czasem - Interval
- Zabawa z czasem - Buffer
- Zabawa z czasem - Delay
- Zabawa z czasem - Sample
- Zabawa z czasem - Throttle
- Zabawa z czasem - Timestamp/TimeInterval
- Tworzymy dane - Generators
- Tworzymy dane - Własna klasa publikująca
- Marudzimy - Skip
- Marudzimy - Take
- Łap To! - ConsoleKey
- Kombinatorzy - Concat
- Kombinatorzy - Repeat
- Kombinatorzy - Start With
- Kombinatorzy - Ambiguous
- Kombinatorzy - Merge
- Kombinatorzy - Zip
- Kombinatorzy - Switch
- Kombinatorzy - When-And-Then
- Kombinatorzy - Combine Latest
- Transformers - Select
- Transformers - OfType and Cast
- Transformers - Metadata
- Bileciki do kontroli - Unit Tests of Interval
- Bileciki do kontroli - Unit Tests of Observer Interval
- Bileciki do kontroli - Unit Tests of Create Cold/Hot Observable
- Szpryca - AutoFac
Wstęp
Jazda z koksem… Dzisiaj poruszymy tematykę - kolejnego operatora związanego z czasem. Hurra… :| Znowu?? To jednak już przed ostatnia część związana z czasem… Zapraszam ponownie do krainy reaktywnej magii.
Observable.Throttle
Dzisiaj mamy ciekawy obiekt. Jego zachowanie bywa często przydatne, zwłaszcza gdy chcemy wykryć sytuację bezczynności użytkownika.
Publikowanie treści na strumieniu wyzwalane jest w tym przypadku gdy zostanie wykryta bezczynność na bazowym strumieniu powyżej określonego czasu.
W tym celu przygotowałem projekt o nazwie Throttle znajdujący się na GitHub.
Zbudowałem sztuczny twór samoczynnie blokujący i zwalniający proces publikowania treści do strumienia z wykorzystaniem pętli i opóźnienia Throw.Sleep.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public IObservable<int> Stream => GetStream().ToObservable();
private IEnumerable<int> GetStream()
{
var i = 0;
while (!_end)
{
while (!_publish)
{
Thread.Sleep(100);
}
Console.Write($"{i},");
yield return i++;
Thread.Sleep(100);
}
}
Jak widać na powyższym fragmencie, pętla while odpowiada za publikowanie danych na strumień. Natomiast pole _publish - określa czy publikować treści, czy czekać na zmianę statusu.
Dla uproszczenia konwersję z IEnumerable => IObservable wykonują w klasie pod właściwością: Stream. Do tego celu wykorzystałem rozszerzenie ToObservable z Rx-ów. Pozwalające właśnie na taką zamianę.
Kolejny fragment to użycie Observable.Interval do zmiany statusu publikacji danych na strumieniu dla Observable.Throttle. Dodatkowo wyświetlanych jest trochę informacji na temat zmiany.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var switcherTimeSpan = TimeSpan.FromSeconds(5);
var switcher = Observable.Interval(switcherTimeSpan);
_switcherSubscribe = switcher.Subscribe(x =>
{
_publish = !_publish;
var swithcTo = _publish ? "publish (unlock stream)" : "not publish (lock stream)";
Console.WriteLine();
Console.WriteLine($"Switch to {swithcTo}");
Console.WriteLine();
},
Console.WriteLine,
() => { Console.WriteLine("Switcher completed"); });
}
Uprościłem także kod samego Subscribe, poprzez bezpośrednie wykorzystanie Console.WriteLine jako OnError bez konieczności użycia lambdy.
Ostatni fragment to już wykorzystanie klasy StreamPublisher. Zapisanie się na strumień przy pomocy Throttle. I logowanie tych operacji. Także tutaj skorzystałem z Timestamp w celu opakowania danych stemplem czasowym.
1
2
3
4
5
6
7
8
var streamPublisher = new StreamPublisher();
var throttle = streamPublisher.Stream.Throttle(TimeSpan.FromMilliseconds(500));
var throttleSubscribe = throttle.Timestamp().Subscribe(
x => Console.WriteLine("Last item {0}: {1}", x.Value, x.Timestamp),
Console.WriteLine,
() => Console.WriteLine("Throttle completed"));
Tym samym jeżeli na strumień będą publikowane dane i publikacja zostanie wstrzymana na więcej niż 500ms. To wówczas Throttle wyświetli ostatnio wysłaną danę.
Zakończenie
Zapraszam do zabawy z przykładem jaki zamieściłem na GitHub. Samodzielnego poeksperymentowania. Analizy. Krytykowania błędów autora ;). Przecież nie jestem Alfą i Omegą.
Jest to post wchodzący w skład podjętego wyzwania ogłoszonego przez MIROBURN we vlogu z dnia 3 lutego 2018 roku.
Celem wyzwania jest systematyczne działanie w ciągu 30 dni.
Postanowiłem pisać post dziennie o tematyce Programowania Reaktywnego dla platformy .NET.
Wszelkie źródła związane z postami znajdują się na repozytorium GitHub.
Stan obecny wyzwania: 30 z 30 dni.
Referencje:
- MSDN - Getting Started with Rx,
- MSDN - Reactive Extensions,
- 101 Rx Samples,
- ReactiveX,
- Code Project,
- GitHub
Wcześniejszy: Programowanie Reaktywne - Zabawa z czasem - Sample
Następny: Programowanie Reaktywne - Zabawa z czasem - Timestamp/TimeInterval.