async ile Eşzamanlılığı Uygulamak
Bu bölümde async yaklaşımını, 16. bölümde iş parçacıklarıyla ele aldığımız
eşzamanlılık problemlerinin bazılarına uygulayacağız. Orada temel fikirlerin
çoğunu zaten konuştuğumuz için, burada daha çok iş parçacıkları ile future’lar
arasındaki farklara odaklanacağız.
Birçok durumda async ile eşzamanlılık kurmaya yarayan API’ler, iş
parçacıklarıyla çalışırken kullandıklarımıza oldukça benzer. Bazı durumlarda ise
tam tersine belirgin biçimde farklılaşırlar. Ayrıca API’ler dışarıdan benzer
göründüğünde bile, çoğu zaman davranışları ve neredeyse her zaman performans
özellikleri farklıdır.
spawn_task ile Yeni Bir Görev Oluşturmak
- bölümde “
spawnile Yeni Bir İş Parçacığı Oluşturmak” kısmında yaptığımız ilk örnek, iki ayrı iş parçacığında sayaç arttırmaktı. Aynı şeyi bu kezasyncile yapalım.trplcrate’i,thread::spawnAPI’sine çok benzeyenspawn_taskfonksiyonunu vethread::sleep’in eşzamansız karşılığı olansleepfonksiyonunu sunar. Bunları birlikte kullanınca 17-6 numaralı listedeki örneği elde ederiz.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
trpl::spawn_task(async {
for i in 1..10 {
println!("birinci görevden merhaba sayı {i}!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("ikinci görevden merhaba sayı {i}!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
}
Başlangıç olarak main fonksiyonunu trpl::block_on ile sarıyoruz; böylece en
üst düzey akışımız eşzamansız çalışabiliyor.
Not: Bu noktadan sonra bölümdeki her örnek
mainiçinde aynıtrpl::block_onsarmalamasını kullanacak. Tıpkımainfonksiyonunu çoğu örnekte açıkça göstermediğimiz gibi, burada da zaman zaman bunu atlayacağız. Kendi kodunuza eklemeyi unutmayın.
Ardından blok içinde iki döngü yazıyoruz. Her ikisinde de yarım saniye bekleyen
bir trpl::sleep çağrısı var. Döngülerden birini trpl::spawn_task içine,
ötekini de üst düzey for döngüsü olarak bırakıyoruz. sleep çağrılarından
sonra da await ekliyoruz.
Bu kod, iş parçacıklı sürüme çok benzer davranır. Kendi terminalinizde çalıştırdığınızda mesajların sırası yine farklı olabilir:
ikinci görevden merhaba sayı 1!
birinci görevden merhaba sayı 1!
birinci görevden merhaba sayı 2!
ikinci görevden merhaba sayı 2!
birinci görevden merhaba sayı 3!
ikinci görevden merhaba sayı 3!
birinci görevden merhaba sayı 4!
ikinci görevden merhaba sayı 4!
birinci görevden merhaba sayı 5!
Ama bu sürüm, ana async blok içindeki for döngüsü biter bitmez sonlanır.
Çünkü spawn_task ile başlattığımız görev, main sona erdiğinde kapatılır.
İlk görevin gerçekten sonuna kadar çalışmasını istiyorsak, onun bitmesini
beklemek için bir join tutamacı kullanmamız gerekir. İş parçacıklarında bunu
join ile yapmıştık. 17-7 numaralı listede aynı işi await ile yapıyoruz;
çünkü görev tutamacının kendisi zaten bir future’dır. Output türü Result
olduğu için await sonrasında unwrap da çağırıyoruz.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let tutamac = trpl::spawn_task(async {
for i in 1..10 {
println!("birinci görevden merhaba sayı {i}!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("ikinci görevden merhaba sayı {i}!");
trpl::sleep(Duration::from_millis(500)).await;
}
tutamac.await.unwrap();
});
}
await kullanmakBu güncellenmiş sürüm her iki döngü de tamamlanana kadar çalışır:
ikinci görevden merhaba sayı 1!
birinci görevden merhaba sayı 1!
birinci görevden merhaba sayı 2!
ikinci görevden merhaba sayı 2!
birinci görevden merhaba sayı 3!
ikinci görevden merhaba sayı 3!
birinci görevden merhaba sayı 4!
ikinci görevden merhaba sayı 4!
birinci görevden merhaba sayı 5!
birinci görevden merhaba sayı 6!
birinci görevden merhaba sayı 7!
birinci görevden merhaba sayı 8!
birinci görevden merhaba sayı 9!
Şimdilik async ile iş parçacıkları benzer sonuçlar veriyor gibi görünüyor;
sadece sözdizimi farklı. Burada join tutamacında join çağırmak yerine
await kullanıyoruz ve sleep çağrılarını da bekliyoruz.
Asıl büyük fark, bunun için ayrı bir işletim sistemi iş parçacığı açmak zorunda
olmamamız. Hatta burada görev başlatmak bile şart değil. Çünkü async bloklar,
derleme sonrasında adsız future’lara dönüşür. Dolayısıyla her döngüyü bir async
blok içine koyup, ikisini de trpl::join ile birlikte sonuna kadar
çalıştırabiliriz.
- bölümde “Tüm İş Parçacıklarının Bitmesini Beklemek” kısmında
std::thread::spawndönüşü olanJoinHandleüstündejoinmetodunu kullanmıştık.trpl::joinbunun future’lar için olan karşılığı gibidir. İki future verirseniz, size bu ikisi birlikte tamamlandığında her ikisinin çıktısını taşıyan bir demet döndüren yeni bir future üretir. Bu yüzden 17-8 numaralı listedefut1ilefut2yi ayrı ayrı beklemek yerine,trpl::jointarafından üretilen yeni future’ı bekliyoruz.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let fut1 = async {
for i in 1..10 {
println!("birinci görevden merhaba sayı {i}!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
let fut2 = async {
for i in 1..5 {
println!("ikinci görevden merhaba sayı {i}!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
trpl::join(fut1, fut2).await;
});
}
trpl::join kullanmakBu kodu çalıştırdığımızda iki future’ın da sonuna kadar yürüdüğünü görürüz:
birinci görevden merhaba sayı 1!
ikinci görevden merhaba sayı 1!
birinci görevden merhaba sayı 2!
ikinci görevden merhaba sayı 2!
birinci görevden merhaba sayı 3!
ikinci görevden merhaba sayı 3!
birinci görevden merhaba sayı 4!
ikinci görevden merhaba sayı 4!
birinci görevden merhaba sayı 5!
birinci görevden merhaba sayı 6!
birinci görevden merhaba sayı 7!
birinci görevden merhaba sayı 8!
birinci görevden merhaba sayı 9!
Bu kez sıra her çalıştırmada aynı kalır. Bu, iş parçacıklarıyla ve 17-7’deki
trpl::spawn_task kullanımıyla gördüğümüzden farklıdır. Nedeni, trpl::join
fonksiyonunun adil (fair) davranmasıdır: her future’ı sırayla yoklar, biri
hazır diye ötekinin önüne geçmesine izin vermez. İş parçacıklarında hangi
iş parçacığının ne kadar süre koşacağını işletim sistemi belirler. Async
Rust’ta ise bu kararı çalışma zamanı verir.
Şu varyasyonları deneyip ne yaptıklarını gözlemleyin:
- Döngülerden birinin ya da ikisinin etrafındaki async bloğu kaldırın.
- Her async bloğu tanımladıktan hemen sonra
awaitedin. - Yalnızca ilk döngüyü async blok içine koyun; sonra oluşan future’ı ikinci döngü tamamlandıktan sonra bekleyin.
Dilerseniz önce çıktının ne olacağını tahmin edip sonra kodu çalıştırarak kontrol edin.
İki Görev Arasında Mesaj İletimiyle Veri Göndermek
Future’lar arasında veri paylaşmak da tanıdık gelecek: yine mesaj iletimi kullanacağız, ama bu kez türlerin ve fonksiyonların eşzamansız sürümleriyle. Temel farkları daha iyi görmek için 16. bölümdeki “İş Parçacıkları Arasında Mesaj İletimiyle Veri Taşımak” kısmına göre biraz farklı bir rota izleyeceğiz. 17-9 numaralı listede yalnızca tek bir async blokla başlıyoruz; henüz ayrı bir görev başlatmıyoruz.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let (gonderici, mut alici) = trpl::channel();
let deger = String::from("merhaba");
gonderici.send(deger).unwrap();
let alinan = alici.recv().await.unwrap();
println!("alındı '{alinan}'");
});
}
gonderici ve alici değişkenlerine atamakBurada 16. bölümde iş parçacıklarıyla kullandığımız çoklu üretici, tekli
tüketici kanalının eşzamansız sürümü olan trpl::channel kullanılıyor. API’de
yalnızca küçük farklar var: alıcı taraf değiştirilemez değil değiştirilebilir
oluyor ve recv metodu değeri doğrudan vermek yerine, beklememiz gereken bir
future üretiyor. Mesaj göndermek için ayrı bir iş parçacığına ya da ayrı bir
göreve bile ihtiyacımız yok; yalnızca alici.recv çağrısını beklememiz yeterli.
std::mpsc::channel içindeki senkron Receiver::recv, mesaj gelene kadar
bloklar. Buna karşılık trpl::Receiver::recv bloklamaz; çünkü kendisi zaten
eşzamansızdır. Hazır olmadığında denetimi çalışma zamanına geri verir. Biz de
send çağrısını beklemeyiz; çünkü burada kullandığımız kanal sınırsızdır ve
gönderim bloklayıcı değildir.
Not: Bu eşzamansız kodun tamamı
trpl::block_oniçine verdiğimiz async blokta çalıştığı için, blok içindeki her şey bloklamadan ilerleyebilir. Buna karşılık blok dışındaki kod,block_ondönene kadar bekler.block_onun amacı zaten budur: hangi noktada bloklayıp senkron ile async kod arasında geçiş yapacağını sizin seçmenizi sağlar.
Bu örnekte iki önemli şey var. Birincisi, mesaj hemen gelir. İkincisi, burada future kullansak da henüz gerçek anlamda eşzamanlılık yoktur. Listedeki her şey tıpkı future yokmuş gibi sırayla gerçekleşir.
Şimdi mesajları tek tek ve aralarında bekleyerek gönderelim. 17-10 numaralı liste bunu gösteriyor.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (gonderici, mut alici) = trpl::channel();
let degerler = vec![
String::from("merhaba"),
String::from("-den"),
String::from("gelecek"),
String::from("icinden"),
];
for deger in degerler {
gonderici.send(deger).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = alici.recv().await {
println!("alındı '{value}'");
}
});
}
await ile beklemekMesajları göndermenin yanı sıra onları almak da gerekiyor. Kaç mesaj geleceğini
bildiğimiz için teoride alici.recv().await çağrısını dört kez elle
yazabilirdik. Ama gerçek hayatta çoğu zaman bilinmeyen sayıda mesaj bekleriz.
Bu yüzden artık mesaj kalmadığını anlayana kadar beklemeyi sürdürmemiz gerekir.
16-10 numaralı listede senkron kanal için for döngüsü kullanmıştık. Rust şu
anda eşzamansız üretilen öğeler için doğrudan for döngüsü sunmadığından,
burada while let koşullu döngüsünü kullanıyoruz. Bu yapı, 6. bölümde
gördüğünüz if let biçiminin döngü karşılığıdır. Desen eşleştiği sürece döngü
devam eder.
alici.recv() çağrısı bir future üretir; biz de onu await ederiz. Mesaj
geldiğinde future Some(message) olarak çözülür. Kanal kapandığında ise artık
değer kalmadığını göstermek için None döner. while let döngüsü bütün bunları
bir araya getirir: sonuç Some(message) ise mesajı alıp gövde içinde
kullanırız; None ise döngü biter.
Bu kod artık bütün mesajları başarıyla gönderip alır; ama iki problem kalır. Birincisi, mesajlar yarım saniye arayla değil, program başladıktan yaklaşık iki saniye sonra topluca gelir. İkincisi, program hiç sonlanmaz; yeni mesaj beklemeye devam eder.
Tek Async Blok İçindeki Kod Doğrusal Çalışır
Mesajların neden tek tek değil de topluca geldiğini anlayarak başlayalım. Bir
async blok içinde await noktaları hangi sıradaysa, kod çalışırken de akış o
sırayla ilerler.
17-10 numaralı listede yalnızca tek bir async blok var; bu yüzden her şey
doğrusaldır. Hâlâ eşzamanlılık yoktur. Bütün gonderici.send çağrıları ve
onların arasındaki trpl::sleep çağrıları tamamlanır; ancak ondan sonra
while let döngüsü recv tarafındaki await noktalarına gelmeye başlar.
İstediğimiz davranış, yani her mesaj arasında gerçekten beklenmesi ise
gönderme ve alma işlemlerini kendi async bloklarına ayırmamızı gerektirir.
Bunu 17-11 numaralı listede yapıyoruz ve iki future’ı trpl::join ile birlikte
çalıştırıyoruz.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (gonderici, mut alici) = trpl::channel();
let gonderici_gelecegi = async {
let degerler = vec![
String::from("merhaba"),
String::from("-den"),
String::from("gelecek"),
String::from("icinden"),
];
for deger in degerler {
gonderici.send(deger).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let alici_gelecegi = async {
while let Some(value) = alici.recv().await {
println!("alındı '{value}'");
}
};
trpl::join(gonderici_gelecegi, alici_gelecegi).await;
});
}
send ile recv işlemlerini ayrı async bloklara bölüp bunların future’larını birlikte beklemekBu güncel sürümle mesajlar iki saniye sonra toplu halde gelmek yerine, yaklaşık 500 milisaniye aralıklarla yazdırılır.
Sahipliği Async Blok İçine Taşımak
Buna rağmen program yine hiç bitmez. Sebep, while let döngüsü ile
trpl::join arasındaki ilişkidir:
trpl::jointarafından dönen future ancak kendisine verilen iki future da bittiğinde tamamlanır.- Gönderen future, son mesajı yolladıktan sonra son uykusunu da tamamlayınca biter.
- Alan future,
while letdöngüsü bitmeden tamamlanmaz. while letdöngüsü,alici.recv().awaitNonedöndürmeden bitmez.recvancak kanalın öbür ucu kapandığındaNonedöndürür.- Kanal, ya
alici.closeçağrılırsa ya da gönderici taraf düşerse kapanır.
Şu anda mesaj gönderen async blok gondericiyi yalnızca ödünç alıyor. Ama onu
blok içine taşıyabilseydik, blok sona erdiğinde gonderici de düşerdi.
13. bölümde move anahtar sözcüğünü kapanışlarla nasıl kullandığımızı, 16.
bölümde de iş parçacıklarıyla çalışırken neden sık sık buna ihtiyaç duyduğumuzu
görmüştünüz. Aynı mantık async bloklar için de geçerlidir. Bu yüzden 17-12
numaralı listede async bloğunu async move yapıyoruz.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (gonderici, mut alici) = trpl::channel();
let gonderici_gelecegi = async move {
// --snip--
let degerler = vec![
String::from("merhaba"),
String::from("-den"),
String::from("gelecek"),
String::from("icinden"),
];
for deger in degerler {
gonderici.send(deger).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let alici_gelecegi = async {
while let Some(value) = alici.recv().await {
println!("alındı '{value}'");
}
};
trpl::join(gonderici_gelecegi, alici_gelecegi).await;
});
}
Bu sürümü çalıştırdığımızda, son mesaj alındıktan sonra program düzgün biçimde sona erer. Şimdi birden fazla future’dan veri göndermek için nelerin değişmesi gerektiğine bakalım.
join! Makrosuyla Birden Fazla Future’ı Birleştirmek
Bu eşzamansız kanal aynı zamanda çoklu üreticili olduğu için, birden fazla
future içinden mesaj göndermek istersek gonderici üstünde clone
çağırabiliriz. 17-13 numaralı liste bunu gösteriyor.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (gonderici, mut alici) = trpl::channel();
let gonderici1 = gonderici.clone();
let gonderici1_gelecegi = async move {
let degerler = vec![
String::from("merhaba"),
String::from("-den"),
String::from("gelecek"),
String::from("icinden"),
];
for deger in degerler {
gonderici1.send(deger).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let alici_gelecegi = async {
while let Some(value) = alici.recv().await {
println!("alındı '{value}'");
}
};
let gonderici_gelecegi = async move {
let degerler = vec![
String::from("daha"),
String::from("fazla"),
String::from("mesaj"),
String::from("sana"),
];
for deger in degerler {
gonderici.send(deger).unwrap();
trpl::sleep(Duration::from_millis(1500)).await;
}
};
trpl::join!(gonderici1_gelecegi, gonderici_gelecegi, alici_gelecegi);
});
}
Önce ilk async bloktan önce gondericiyi klonlayıp gonderici1 oluşturuyoruz.
Sonra 17-12’de yaptığımız gibi gonderici1i ilk bloğun içine taşıyoruz. Daha
sonra da orijinal gondericiyi yeni bir async blok içine taşıyıp biraz daha
yavaş aralıklarla başka mesajlar gönderiyoruz.
Mesaj gönderen iki async blok da async move olmak zorunda. Aksi halde
gonderici ile gonderici1 bloklar tamamlandığında düşmez ve yine sonsuz döngü
sorununa geri döneriz.
Son olarak trpl::join yerine trpl::join! kullanıyoruz. Çünkü bu makro,
derleme zamanında kaç tane future olacağını bildiğimiz durumlarda istediğimiz
sayıda future’ı birlikte beklememize izin verir.
Böylece iki gönderen future’dan gelen bütün mesajları görürüz. Ayrıca iki future farklı gecikmeler kullandığı için, mesajlar da farklı zaman aralıklarında ulaşır:
alındı 'merhaba'
alındı 'daha'
alındı '-den'
alındı 'gelecek'
alındı 'fazla'
alındı 'icinden'
alındı 'mesaj'
alındı 'sana'
Bu bölümde future’lar arasında mesaj iletimini, bir async blok içindeki kodun neden sıralı çalıştığını, sahipliğin async blok içine nasıl taşındığını ve birden fazla future’ın nasıl birleştirildiğini gördük. Şimdi de çalışma zamanına başka bir göreve geçebileceğini ne zaman ve neden söylememiz gerektiğine bakalım.