rxjs

Halil Tevfik
4 min readJun 20, 2021
stream

giriş

rxjs ne?

rxjs, reactive extensions adıyla farklı platformlarda uygulama içerisindeki akışkan (anlatacağım) verileri biçimlendirip uygulamada farklı eylemler almamızı sağlayan bir kütüphane. evet, bu cümle pek bir şey anlatmıyor farkındayım. o yüzden biraz örneklendirerek biraz da elimizde sadece rxjs varken dahi neler yapabileceğimizi anlatarak pekiştirmeye çalışacağım.

akışkan (zaman içerisinde değişebilen) veriler var dedik, bunları biçimlendirebiliyoruz (manipüle ediyoruz) dedik başka şeyler de diyeceğiz ama önce bir durup soralım;

  • neden?
  • neden bu veriler zaman içerisinde değişiyor?
  • neden verileri manipüle ediyoruz?

rxjs neden?

bu sorular çok zor sorular değil, teşekkür ederim. sırasıyla;

  • işimizi kolaylaştırıyor
  • kullanıcının yaptığı eylemlere tepki veriyoruz ya da uygulamamızda zamana bağlı olan veriler var
  • verileri olması gereken şekle sokmak için. basit bir örnek olarak, farklı konumlardaki kullanıcılara yerelleştirilmiş bilgiler sunuyoruz

rxjs nasıl?

basitçe anlatmak gerekirse, rxjs bize adına Observable (gözlemlenebilir) dediğimiz nesneler veriyor ve bunları subscribe (abone olmak) metoduyla izlemeye başlıyoruz.

peki veri nerede oluşuyor?

belirli kaynaklardan gelen verileri Observable nesnelerine dönüştürmemizi sağlayan from, fromEvent, of (yine, anlatacağım…) gibi yardımcı işlevlerle beraber asıl kaynaklar Subject (özne) ve bu sınıftan türeyip ek işlevler veren ReplaySubject, BehaviourSubject, AsyncSubject sınıflarıdır.

örnek kod parçasında bir Subject oluşturup sırasıyla 1, 2 ve 3 verilerini paylaştık. şu an için herhangi bir Subscriber (abone) olmadığı için güzelim veriler boşlukta süzülüp gitti…

şimdi ise, verileri paylaşmadan önce subscribe metoduyla dinlemeye başladığımız Subject, gelen verilerin hepsinin bize ulaşmasını sağladı. ama hala bir sorun var, unsubscribe metoduyla işimiz bittiğinde verileri dinlemeyi bırakmadık.

sorunsuz bir kod parçasını yazmış olduk, bu kod parçası her çalıştığında RAM’de gereksiz veri bırakmayacak. bir başka deyişle kod parçasında artık memory leak yok.

ReplaySubject, BehaviorSubject ve AsyncSubject

Subject tek başına çoğu işin altından kalkabilse de, yetişemediği noktalar oluyor. örneğin; dinlemeye başladığınız andan önceki hiçbir veriden haberiniz olmuyor. bu durumda yardımımıza ReplaySubject koşacak. ReplaySubject sınıfından bir nesne oluştururken isteğe bağlı olan bufferSize (tampon boyutu) parametresi son paylaşılan n adet (tampon boyutuyla belirttiğiniz kadar) her dinlemeye başladığımızda yeniden paylaşacak.

peki ne işimize yarayacak?

yukarıdaki kod parçasında authorize adındaki (nasıl çalıştığı önemli olmayan) yardımcı işlevin döndürdüğü session nesnesinden kullanıcı bilgisini öznemizden paylaşmasını istedik. eğer direkt olarak Subject kullanmış olsaydık, ileride şu anki kullanıcı bilgilerine erişmek istediğimizde başarısız olacaktık. ReplaySubject ise, ne zaman istersek dinleyebildiğimiz ve son paylaşılan istediğimiz sayıdaki veriyi bizim için saklıyor.

bunun yanı sıra BehaviorSubject de benzer bir görev üstleniyor. son paylaşılan n veriyi yeniden paylaşmak yerine, her dinlenilmeye başlandığında sadece son veriyi paylaşıp, value özelliğinde tutuyor. bu sayede eğer istersek son paylaşılan veriye direkt ulaşabiliyoruz. BehaviorSubject nesnesi oluşturulurken belirtmemiz gereken bir argüman var, öznenin başlangıç değeri.

yukarıdaki örnekte fibonacci serisinin ilk 6 elemanını paylaşmış olduk. şu an bir Subscriber olmadığı için ilk 5 değer yine boşluğa doğru süzüldü. ama merak etmeyin! son değer, yani 8 hala yeni aboneler için ulaşılabilir olacak ve aynı zamanda öznenin value özelliğinde (fibonacci.value) senkron olarak da ulaşılabilecek.

AsyncSubject anlatımına başlamadan önce henüz bahsetmediğim bir şey var; Observable nesnelerinde verileri paylaşmanın sonu gelebiliyor. subscribe metoduyla gelen her yeni veriyi dinleyebildiğimiz gibi, o öznenin işleyişinde olan hatayı (Observable paylaştığı ilk hatadan sonra son bulur) ve öznenin tükenmesini de izleyebiliriz.

yine örnekteki gibi gelen her yeni veri, oluşan ilk hata ve öznenin sonlanıp, sonlanmadığı dinlenilebilir.

artık bunları biliyorsak AsyncSubject ne işe yarıyor anlayabiliriz. AsyncSubject zaman içerisinde değişkenlik gösterdiğini belirttiğimiz diğer öznelere zıt olarak, sadece sonlandığında, sonlanmadan önce gelen son veriyi paylaşıyor. yani Promise gibi sadece bir değeri var.

rxjs/operators diye bir şey duydum?

şu ana kadar detaylı bir şekilde Observable ve Subject ne işe yarıyor bunlardan bahsettik. ama fark ettiyseniz herhangi bir veri şekillendirmesi olmadı. bu kısımda bize operatorler (Türkçe’m bitti) yardımcı olacak. basit bir operator olan map gelen veriye direkt şekil vermemizi sağlıyor. operatorleri kullanmak için Observable ve Subject nesnelerinin pipe (boru, kullanınca anlaşılır olacak) metodunu kullanacağız.

ilk örneğimiz olan bu özneden gelen veriler tamsayı ve biz (en azından ben) bu sayıların karesini almak istiyorum.

bu kadar basit! bundan sonra özne yerine pipe metodunun döndürdüğü Observable nesnesini dinlememiz yeterli. dinlediğimizde alacağımız sonuç ise sırasıyla (bariz bir şekilde) 2, 4, … olacak.

tamam da bu yetmez ki!

dediğinizi duyar gibiyim. sabırsız da olabilirsiniz ama rxjs öğrenme eğrisi fazla yatay olan bir teknoloji, yani zaman alıyor. şimdi, gerçek hayat örneği olması için socket.io uygulamamızdan gelen verileri dinlemek istediğimizi varsayalım. daha önce bahsettiğimiz fromEvent yardımcı işlevini şu an kullanacağız.

örnek kod parçası (eğer bir socket.io sunucumuz varsa) sorunsuz çalışacaktır. çalışır çalışmasına da, sanki bu kod parçasında bir şeyler eksik gibi? bir kaç operator kullanırsak daha tatlı olabilir gibi geldi. örneğin burada socket.io client’ını da bir observable olarak sağlayabiliriz ve böylece kullanıcı giriş yada çıkış yaptığında sunucuya tekrar bağlanabiliriz. neden olmasın?

burada $token öznesinin üstüne çok gitmeyin, bir yerlerden bize sağlandığını bilmemiz yeterli. bunu bir kenara bıraktıktan sonra $client Observable’ı bu $token’ı dinliyor ve zaman içerisindeki değişimlere göre yeni token ile yeni socket bağlantıları açıyor. sondaki shareReplay(1) operatörüne dikkat edelim, bu operatörün amacı; yeni gelen subscriberlara önceden var olan bir socket varsa bunu sağlamak ve aynı zamanda son değeri yeniden paylaşmak. hani en başta demiştik ya, eğer subscribe olmazsak veriler boşlukta süzülüyor diye; bu operatör belirttiğimiz sayıda veriyi hatırlayıp bize belirtiyor. bizim bu iş için hatırlamamız gereken tek şey son veri, yada daha açık bir şekilde son socket bağlantısı olduğu için “1” diye belirttik. aynı şeyi listen işlevi de kullanıyor dikkat ederseniz. buradaki amacımız ise, ileride bir yerlerde oluşturduğumuz yeni Observable’ları oluşturduğumuz yerdeki subscriberlar arasında paylaşabilmek.

bu makale daha fazla uzamaya devam etmeden önce sonlandırıp, sonraki makalelere de konu bırakalım.

ben teyfik, bugün sizlere rxjs teknolojisinin ne işe yaradığını ve neden kullanıldığını özetledim. bir sonraki makalede görüşmek ve belki de bir gün bir yerlerde karşılaşmak üzere, hoşça kalın…

--

--

Halil Tevfik

JavaScript teknolojileri hakkında bilgili ve yenilerine hızla adapte olabilen bir Lead Web Developer.