GoDev

Blog programisty.

Programowanie Reaktywne - Kombinatorzy - Switch.


Artykuł ten jest częścią serii artykułów na temat Programowania reaktywnego.

Zapraszam na GitHub-a.

Tematy

  1. Wstęp
  2. Zabawa z czasem - Timer
  3. Kto za tym stoi? - Scheduler
  4. Nie zapominaj - Subscribe
  5. Zabawa z czasem - Interval
  6. Zabawa z czasem - Buffer
  7. Zabawa z czasem - Delay
  8. Zabawa z czasem - Sample
  9. Zabawa z czasem - Throttle
  10. Zabawa z czasem - Timestamp/TimeInterval
  11. Tworzymy dane - Generators
  12. Tworzymy dane - Własna klasa publikująca
  13. Marudzimy - Skip
  14. Marudzimy - Take
  15. Łap To! - ConsoleKey
  16. Kombinatorzy - Concat
  17. Kombinatorzy - Repeat
  18. Kombinatorzy - Start With
  19. Kombinatorzy - Ambiguous
  20. Kombinatorzy - Merge
  21. Kombinatorzy - Zip
  22. Kombinatorzy - Switch
  23. Kombinatorzy - When-And-Then
  24. Kombinatorzy - Combine Latest
  25. Transformers - Select
  26. Transformers - OfType and Cast
  27. Transformers - Metadata
  28. Bileciki do kontroli - Unit Tests of Interval
  29. Bileciki do kontroli - Unit Tests of Observer Interval
  30. Bileciki do kontroli - Unit Tests of Create Cold/Hot Observable
  31. Szpryca - AutoFac

Wstęp

Reactive Extensions - Switch Swego czasu był taki film gdzie głównie bohaterowie zamieniają się między sobą swoimi ciałami. W przypadku Rx-ów do czynienia mamy z metodą pozwalając przełączać się pomiędzy strumieniami. Warunkiem przełączenia jest wiek publikowanych danych.

Observable.Switch

Samą implementację rozpoczynam od stworzenia listy generatorów. Lista ta będzie niezbędna w procesie budowania Switch-a. Listę trzeba uprzednio przekonwertować na odpowiedni typ. W tym przypadku będzie to typ: IObservable<IObservable<Randomizer>>. Do konwertowania wykorzystujemy wbudowaną funkcjonalność w Rx-ach: ToObservable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var randomedList = new List<IObservable<Randomed>>();
randomedList.Add(ObservableRandomizer.Create(0, 100));
randomedList.Add(ObservableRandomizer.Create(1000, 2000));
randomedList.Add(ObservableRandomizer.Create(0, 100));

for (var i = 0; i < randomedList.Count; i++)
{
  randomedList[i].DefaultPrint($"Randomizer{i}");
}

var observableRandomedList = randomedList.ToObservable();

var observableSequence = observableRandomedList.Switch();

var subscription1 = observableSequence.DefaultPrint("Switch");

Po wykonaniu tej operacji możemy dokonać zapisu na nowo powstały strumień np. przy pomocy metody DefaultPrint. Tworzenie Observable.Switch można wykonać w dwojaki sposób:

  • Observable.Switch(poszczególne sekwencje oddzielone średnikiem),
  • .Switch() ten sposób określamy korzystanie z nowej funkcjonalności bezpośrednio na przekonwertowanej do typu IObservable.

Reactive Extensions - Switch

Ale czym jest ten Switch. Otóż odpowiedzialny jest on za wybór strumienia jaki będzie wykorzystany do publikacji wyników pochodzących od dystrybutora powstałego na wskutek działania metody Switch. Jego główne założenie, mówi że publikowane są dane jedynie z najświeższego strumienia jaki został podpięty.

Do projektu z Rx-ami dodałem nową klasę Randomed, która to będzie zawierała informacje generowane przez wątek na jakim uruchomiona jest publikacja.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
namespace RXLib.Randomizer
{
	public class Randomed
	{
		public int Value { get; set; }
		public long Ticks { get; set; }

		public Randomed()
		{
			Value = 0;
			Ticks = 0;
		}

		public override string ToString()
		{
			return $"{Ticks}->{Value}";
		}

Dodatkowo stworzyłem klasę o nazwie: ObservableRandomizer zawierającą poniższy wątek jaki odpowiedzialny jest za publikowanie danych dos strumienia.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void StartThread()
{
  _threadInProgress = true;

  _thread = new Thread(ThreadMethod);
  _thread.Start();
}

private void ThreadMethod()
{
  while (_threadInProgress)
  {
    var delay = new Random(DateTime.Now.Millisecond).Next(200, 1000);
    Thread.Sleep(delay);

    _randomed.Ticks = DateTime.Now.Ticks;
    _randomed.Value = new Random(DateTime.Now.Millisecond).Next(_from, _to);

    Publish();
  }
}

Owe działania odbywają się w wątkach. Dlatego, że aplikacja ma działać samodzielnie. Przygotowując a następnie publikując dane.

Głównym celem tworzenia strumienia opartego o klasę ObservableRandomizer jest instancja dystrybutora losowych danych (z podanego zakresu). By wprowadzić dodatkową losowość czasu publikowania danych na strumień wykorzystałem opóźnienie z losową wartością dla Thread.Sleep.

Zakończenie

W trakcie rozwoju programu dotyczącego Switch. Napisałem dwie klasy pomocnicze, jednak nie są one obecnie wykorzystywane. Pierwsza z nich dodaje do strumienia Timestamp, a następnie go wyświetla.

1
2
3
4
5
6
public static IDisposable PrintWithTimestamp<T>(this IObservable<T> source, string identify)
{
  var subscription = source
    .Timestamp()
    .Subscribe(
      item =>

Analogicznie działa druga metoda o nazwie: PrintWithTimeInterval. Jedyna różnica jaka tutaj się znajduje to korzystanie w tym przypadku z TimeInterval

1
2
3
4
5
6
public static IDisposable PrintWithTimeInterval<T>(this IObservable<T> source, string identify)
{
  var subscription = source
    .TimeInterval()
    .Subscribe(
      item =>

W obecnym projekcie na chwilę obecną nie owe metody nie są wykorzystywane. Ich implementację napisałem w trakcie pracy nad dzisiejszym aktorem czyli metodą Switch.


Jest to post wchodzący w skład podjętego wyzwania ogłoszonego przez MIROBURN we vlogu z dnia 3 lutego 2018 roku.

Celem wyzwania jest systematyczne działanie w ciągu 30 dni.

Postanowiłem pisać post dziennie o tematyce Programowania Reaktywnego dla platformy .NET.

Wszelkie źródła związane z postami znajdują się na repozytorium GitHub.

Stan obecny wyzwania: 30 z 30 dni.


Referencje:


Wcześniejszy: Programowanie Reaktywne - Kombinatorzy - Zip

Następny: Programowanie Reaktywne - Kombinatorzy - When-And-Then