Programowanie Reaktywne - Zabawa z czasem - Throttle.

11.02.2018


Artykuł ten jest częścią serii artykułów na temat Programowania reaktywnego.

Zapraszam na GitHub-a.

Tematy

  1. Wstęp
  2. Zabawa z czasem - Timer
  3. Kto za tym stoi? - Scheduler
  4. Nie zapominaj - Subscribe
  5. Zabawa z czasem - Interval
  6. Zabawa z czasem - Buffer
  7. Zabawa z czasem - Delay
  8. Zabawa z czasem - Sample
  9. Zabawa z czasem - Throttle
  10. Zabawa z czasem - Timestamp/TimeInterval
  11. Tworzymy dane - Generators
  12. Tworzymy dane - Własna klasa publikująca
  13. Marudzimy - Skip
  14. Marudzimy - Take
  15. Łap To! - ConsoleKey
  16. Kombinatorzy - Concat
  17. Kombinatorzy - Repeat
  18. Kombinatorzy - Start With
  19. Kombinatorzy - Ambiguous
  20. Kombinatorzy - Merge
  21. Kombinatorzy - Zip
  22. Kombinatorzy - Switch
  23. Kombinatorzy - When-And-Then
  24. Kombinatorzy - Combine Latest
  25. Transformers - Select
  26. Transformers - OfType and Cast
  27. Transformers - Metadata
  28. Bileciki do kontroli - Unit Tests of Interval
  29. Bileciki do kontroli - Unit Tests of Observer Interval
  30. Bileciki do kontroli - Unit Tests of Create Cold/Hot Observable
  31. Szpryca - AutoFac

Wstęp

Reactive Extensions - Throttle Jazda z koksem… Dzisiaj poruszymy tematykę - kolejnego operatora związanego z czasem. Hurra… :| Znowu?? To jednak już przed ostatnia część związana z czasem… Zapraszam ponownie do krainy reaktywnej magii.

Observable.Throttle

Dzisiaj mamy ciekawy obiekt. Jego zachowanie bywa często przydatne, zwłaszcza gdy chcemy wykryć sytuację bezczynności użytkownika.

Publikowanie treści na strumieniu wyzwalane jest w tym przypadku gdy zostanie wykryta bezczynność na bazowym strumieniu powyżej określonego czasu.

W tym celu przygotowałem projekt o nazwie Throttle znajdujący się na GitHub.

Zbudowałem sztuczny twór samoczynnie blokujący i zwalniający proces publikowania treści do strumienia z wykorzystaniem pętli i opóźnienia Throw.Sleep.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public IObservable<int> Stream => GetStream().ToObservable();

private IEnumerable<int> GetStream()
{
	var i = 0;

	while (!_end)
	{
		while (!_publish)
		{
			Thread.Sleep(100);
		}

		Console.Write($"{i},");

		yield return i++;

		Thread.Sleep(100);
	}
}

Reactive Extensions - Throttle

Jak widać na powyższym fragmencie, pętla while odpowiada za publikowanie danych na strumień. Natomiast pole _publish - określa czy publikować treści, czy czekać na zmianę statusu.

Dla uproszczenia konwersję z IEnumerable => IObservable wykonują w klasie pod właściwością: Stream. Do tego celu wykorzystałem rozszerzenie ToObservable z Rx-ów. Pozwalające właśnie na taką zamianę.

Kolejny fragment to użycie Observable.Interval do zmiany statusu publikacji danych na strumieniu dla Observable.Throttle. Dodatkowo wyświetlanych jest trochę informacji na temat zmiany.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var switcherTimeSpan = TimeSpan.FromSeconds(5);
var switcher = Observable.Interval(switcherTimeSpan);
_switcherSubscribe = switcher.Subscribe(x =>
	{
	_publish = !_publish;

	var swithcTo = _publish ? "publish (unlock stream)" : "not publish (lock stream)";

	Console.WriteLine();
	Console.WriteLine($"Switch to {swithcTo}");
	Console.WriteLine();
	},
	Console.WriteLine,
	() => { Console.WriteLine("Switcher completed"); });
}

Uprościłem także kod samego Subscribe, poprzez bezpośrednie wykorzystanie Console.WriteLine jako OnError bez konieczności użycia lambdy.

Ostatni fragment to już wykorzystanie klasy StreamPublisher. Zapisanie się na strumień przy pomocy Throttle. I logowanie tych operacji. Także tutaj skorzystałem z Timestamp w celu opakowania danych stemplem czasowym.

1
2
3
4
5
6
7
8
var streamPublisher = new StreamPublisher();

var throttle = streamPublisher.Stream.Throttle(TimeSpan.FromMilliseconds(500));

var throttleSubscribe = throttle.Timestamp().Subscribe(
	x => Console.WriteLine("Last item {0}: {1}", x.Value, x.Timestamp),
	Console.WriteLine,
	() => Console.WriteLine("Throttle completed"));

Tym samym jeżeli na strumień będą publikowane dane i publikacja zostanie wstrzymana na więcej niż 500ms. To wówczas Throttle wyświetli ostatnio wysłaną danę.

Zakończenie

Zapraszam do zabawy z przykładem jaki zamieściłem na GitHub. Samodzielnego poeksperymentowania. Analizy. Krytykowania błędów autora ;). Przecież nie jestem Alfą i Omegą.


Jest to post wchodzący w skład podjętego wyzwania ogłoszonego przez MIROBURN we vlogu z dnia 3 lutego 2018 roku.

Celem wyzwania jest systematyczne działanie w ciągu 30 dni.

Postanowiłem pisać post dziennie o tematyce Programowania Reaktywnego dla platformy .NET.

Wszelkie źródła związane z postami znajdują się na repozytorium GitHub.

Stan obecny wyzwania: 30 z 30 dni.


Referencje:


Wcześniejszy: Programowanie Reaktywne - Zabawa z czasem - Sample

Następny: Programowanie Reaktywne - Zabawa z czasem - Timestamp/TimeInterval.


Zapisz się na listę :)