Mesaj İletimiyle İş Parçacıkları Arasında Veri Taşımak
Güvenli eşzamanlılık sağlamak için giderek daha popüler hale gelen yaklaşımlardan biri mesaj iletimi (message passing) modelidir. Bu modelde iş parçacıkları ya da aktörler, veri içeren mesajları birbirlerine göndererek iletişim kurar. Go dilinin belgelerinde geçen şu ünlü söz bunu güzel özetler: “Belleği paylaşarak iletişim kurmayın; iletişim kurarak belleği paylaşın.”
Mesaj göndermeye dayalı eşzamanlılık için Rust standart kütüphanesi kanal uygulaması sunar. Kanal (channel), verinin bir iş parçacığından diğerine gönderilmesini sağlayan genel bir programlama kavramıdır.
Programlamadaki bir kanalı, yönü belli bir su yolu gibi düşünebilirsiniz. Örneğin bir dereye lastik ördek bırakırsanız, ördek akış yönünde ilerleyip su yolunun sonuna kadar gider.
Bir kanalın iki yarısı vardır: gönderici ve alıcı. Gönderici, ördeği suya bıraktığınız yukarı kısımdır; alıcı ise ördeğin aşağı akışta ulaştığı yerdir. Kodunuzun bir bölümü göndermek istediği veriyle gönderici tarafın metodlarını çağırır; başka bir bölümü de alıcı ucunda yeni mesaj gelip gelmediğini kontrol eder. Gönderici ya da alıcı taraflardan biri düşürüldüğünde kanalın kapandığı söylenir.
Burada, değer üreten ve bunları kanal üzerinden gönderen bir iş parçacığı ile bu değerleri alıp ekrana yazdıran başka bir iş parçacığı olan küçük bir program kuracağız. Özelliği göstermek için kanaldan basit değerler göndereceğiz. Bu yönteme alıştıktan sonra, örneğin bir sohbet sistemi ya da hesabın farklı parçalarını yapan birçok iş parçacığının sonuçları tek bir iş parçacığında topladığı sistemler gibi, iletişim kurması gereken her durumda kanalları kullanabilirsiniz.
Önce 16-6 numaralı listede bir kanal oluşturacağız; ama henüz onunla hiçbir şey yapmayacağız. Bunun şimdilik derlenmediğine dikkat edin; çünkü Rust, kanal üzerinden hangi tür değerler göndermek istediğimizi henüz bilemez.
use std::sync::mpsc;
fn main() {
let (gonderici, alici) = mpsc::channel();
}
gonderici ve alici değişkenlerine atamakYeni bir kanal oluşturmak için mpsc::channel fonksiyonunu çağırırız. Buradaki
mpsc, multiple producer, single consumer ifadesinin kısaltmasıdır. Kısaca
Rust standart kütüphanesindeki kanal uygulaması, bir kanalın değer üreten
birden fazla gönderici ucu olmasına izin verir; ama o değerleri tüketen tek
bir alıcı ucu vardır. Bunu birçok küçük akarsuyun birleşip tek bir nehre
dönüşmesine benzetebilirsiniz: Farklı akışlardan gelen her şey sonunda aynı
nehirde toplanır. Şimdilik tek bir göndericiyle başlayacağız; örnek çalışır
hale gelince birden fazla gönderici ekleyeceğiz.
mpsc::channel fonksiyonu bir demet döndürür. İlk eleman gönderme ucu, yani
göndericidir; ikinci eleman ise alma ucu, yani alıcıdır. Birçok alanda
transmitter ve receiver için geleneksel olarak tx ve rx kısaltmaları
kullanılır; biz burada değişkenleri doğrudan gonderici ve alici olarak
adlandırdık. Bu demeti parçalayan bir desenle birlikte let ifadesi
kullanıyoruz. let ifadelerindeki desenler ile parçalayıcı atamayı 19. bölümde
inceleyeceğiz. Şimdilik bilinmesi gereken şu: mpsc::channel dönüşündeki
demetin parçalarını çıkarmak için bu kullanım çok elverişlidir.
Şimdi gönderici tarafı oluşturulan bir iş parçacığına taşıyalım ve oradan tek bir dizgi gönderelim. Böylece oluşturulan iş parçacığı ana iş parçacığıyla iletişim kurmuş olacak. Bu, nehrin yukarı kısmına lastik ördek bırakmaya ya da bir iş parçacığından diğerine sohbet mesajı göndermeye benzer.
use std::sync::mpsc;
use std::thread;
fn main() {
let (gonderici, alici) = mpsc::channel();
thread::spawn(move || {
let deger = String::from("merhaba");
gonderici.send(deger).unwrap();
});
}
gonderici değerini oluşturulan iş parçacığına taşıyıp "merhaba" göndermekYine thread::spawn ile yeni bir iş parçacığı oluşturuyor, ardından move
yardımıyla gonderici değerini kapanışa taşıyoruz. Böylece göndericinin
sahipliği oluşturulan iş parçacığında oluyor. Kanal üzerinden mesaj
gönderebilmek için o iş parçacığının göndericiye sahip olması gerekir.
Gönderici tarafında, göndermek istediğimiz değeri alan bir send metodu vardır.
send metodu Result<T, E> döndürür. Yani alıcı daha önce düşürülmüşse ve
değeri gönderecek yer kalmamışsa, gönderme işlemi hata döndürür. Bu örnekte
hata olursa paniklemek için unwrap çağırıyoruz. Gerçek bir uygulamada ise
bunu uygun şekilde ele almak isteriz; doğru hata yönetimi stratejileri için 9.
bölüme dönebilirsiniz.
16-8 numaralı listede, ana iş parçacığında alıcıdan gelen değeri alacağız. Bu da nehrin sonundan lastik ördeği almak ya da gelen bir sohbet mesajını okumak gibidir.
use std::sync::mpsc;
use std::thread;
fn main() {
let (gonderici, alici) = mpsc::channel();
thread::spawn(move || {
let deger = String::from("merhaba");
gonderici.send(deger).unwrap();
});
let alinan = alici.recv().unwrap();
println!("Alındı: {alinan}");
}
"merhaba" değerini alıp ekrana yazdırmakAlıcı tarafın kullanışlı iki metodu vardır: recv ve try_recv. Burada
receive kelimesinin kısaltması olan recv metodunu kullanıyoruz. Bu metod,
ana iş parçacığını bloklayarak kanal üzerinden bir değer gönderilmesini
bekler. Bir değer geldiğinde onu Result<T, E> içinde döndürür. Gönderici uç
kapandığında ise artık yeni değer gelmeyeceğini belirtmek için hata döndürür.
try_recv metodu ise bloklamaz; hemen Result<T, E> döndürür. O anda mesaj
varsa Ok, yoksa Err gelir. Bu, mesaj beklerken aynı iş parçacığının başka
işleri de varsa kullanışlıdır: Arada bir try_recv çağırabilir, mesaj geldiyse
onu işleyebilir, gelmediyse kısa süre başka işler yapıp sonra yeniden
kontrol edebilirsiniz.
Bu örnekte sadelik için recv kullandık; çünkü ana iş parçacığının yapacak
başka bir işi yok, yalnızca mesaj gelmesini bekliyor.
16-8 numaralı listedeki kodu çalıştırdığımızda, ana iş parçacığından şu değerin yazdırıldığını görürüz:
Alındı: merhaba
Tam istediğimiz gibi!
Kanallar Üzerinden Sahiplik Aktarmak
Mesaj gönderirken sahiplik kuralları çok kritik bir rol oynar; çünkü güvenli
eşzamanlı kod yazmanızı sağlar. Rust programlarınız boyunca sahipliği düşünmek,
eşzamanlı programlamadaki hataları önlemenin büyük bir parçasıdır. Kanallar ile
sahipliğin birlikte nasıl çalıştığını görmek için küçük bir deney yapalım:
Oluşturulan iş parçacığında deger adlı bir değeri kanaldan gönderdikten
sonra yeniden kullanmaya çalışacağız. 16-9 numaralı listedeki kodu
derlemeyi deneyin; neden buna izin verilmediğini göreceksiniz.
use std::sync::mpsc;
use std::thread;
fn main() {
let (gonderici, alici) = mpsc::channel();
thread::spawn(move || {
let deger = String::from("merhaba");
gonderici.send(deger).unwrap();
println!("değer şu: {deger}");
});
let alinan = alici.recv().unwrap();
println!("Alındı: {alinan}");
}
deger kanal üzerinden gönderildikten sonra onu yeniden kullanmaya çalışmakBurada deger değişkenini gonderici.send ile kanala yolladıktan sonra
ekrana yazdırmaya çalışıyoruz. Buna izin verilmesi kötü olurdu; çünkü değer
başka bir iş parçacığına gönderildiği anda, o iş parçacığı bizim yeniden
kullanmaya çalışmamızdan önce değeri değiştirebilir ya da düşürebilirdi.
Böyle bir durumda tutarsız ya da hiç var olmayan veri yüzünden beklenmedik
sonuçlar ve hatalar ortaya çıkabilirdi. Neyse ki Rust, 16-9 numaralı listedeki
kodu derlemeye çalıştığımızda bize hata verir:
$ cargo run
Compiling mesaj-iletimi v0.1.0 (file:///projects/mesaj-iletimi)
error[E0382]: borrow of moved value: `deger`
--> src/main.rs:10:27
|
8 | let deger = String::from("merhaba");
| ----- move occurs because `deger` has type `String`, which does not implement the `Copy` trait
9 | gonderici.send(deger).unwrap();
| ----- value moved here
10 | println!("değer şu: {deger}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `mesaj-iletimi` (bin "mesaj-iletimi") due to 1 previous error
Eşzamanlılıkla ilgili hatamız bu sayede derleme zamanında yakalandı. send
fonksiyonu parametresinin sahipliğini alır ve değer taşındığında alıcı onun
yeni sahibi olur. Böylece gönderdikten sonra aynı değeri yanlışlıkla yeniden
kullanmamız engellenir; sahiplik sistemi her şeyin doğru olduğundan emin olur.
Birden Fazla Değer Göndermek
16-8 numaralı listedeki kod derlenip çalıştı; ama aslında iki ayrı iş parçacığının kanal üzerinden konuştuğunu çok net göstermiyordu.
16-10 numaralı listede, 16-8’deki örneği daha görünür hale getirmek için bazı değişiklikler yaptık: Oluşturulan iş parçacığı artık birden fazla mesaj gönderecek ve her mesaj arasında bir saniye bekleyecek.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (gonderici, alici) = mpsc::channel();
thread::spawn(move || {
let degerler = vec![
String::from("merhaba"),
String::from("olusturulan"),
String::from("is"),
String::from("parcacigindan"),
];
for deger in degerler {
gonderici.send(deger).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for alinan in alici {
println!("Alındı: {alinan}");
}
}
Bu kez oluşturulan iş parçacığında, ana iş parçacığına göndermek istediğimiz
dizgilerden oluşan bir vektör var. Vektör üzerinde dönüyor, her değeri tek tek
gönderiyor ve her gönderim arasında thread::sleep ile bir saniye bekliyoruz.
Ana iş parçacığında artık recv fonksiyonunu açıkça çağırmıyoruz. Bunun
yerine alici değerini bir yineleyici (iterator) gibi kullanıyoruz. Gelen her
değeri ekrana yazdırıyoruz. Kanal kapandığında yineleme de sona eriyor.
16-10 numaralı listedeki kodu çalıştırdığınızda, her satır arasında yaklaşık bir saniye olacak şekilde aşağıdakine benzer bir çıktı görmeniz gerekir:
Alındı: merhaba
Alındı: olusturulan
Alındı: is
Alındı: parcacigindan
Ana iş parçacığındaki for döngüsünde ayrıca bir bekleme ya da gecikme kodu
olmadığı için, ana iş parçacığının oluşturulan iş parçacığından değer gelmesini
beklediğini buradan anlayabiliyoruz.
Birden Fazla Gönderici Oluşturmak
Daha önce mpsc kısaltmasının multiple producer, single consumer anlamına
geldiğini söylemiştik. Şimdi bunu gerçekten kullanalım ve 16-10 numaralı
listedeki kodu genişleterek, aynı alıcıya değer gönderen birden çok iş
parçacığı oluşturalım. Bunu yapmak için göndericiyi klonlayacağız; 16-11
numaralı liste tam olarak bunu gösteriyor.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (gonderici, alici) = mpsc::channel();
let gonderici1 = gonderici.clone();
thread::spawn(move || {
let degerler = vec![
String::from("merhaba"),
String::from("olusturulan"),
String::from("is"),
String::from("parcacigindan"),
];
for deger in degerler {
gonderici1.send(deger).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let degerler = vec![
String::from("daha"),
String::from("fazla"),
String::from("mesaj"),
String::from("sana"),
];
for deger in degerler {
gonderici.send(deger).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for alinan in alici {
println!("Alındı: {alinan}");
}
// --snip--
}
Bu kez ilk iş parçacığını oluşturmadan önce göndericinin clone metodunu
çağırıyoruz. Böylece ilk oluşturulan iş parçacığına verebileceğimiz yeni bir
gönderici elde ediyoruz. Orijinal göndericiyi ise ikinci oluşturulan iş
parçacığına veriyoruz. Böylece elimizde, tek bir alıcıya farklı mesajlar
yollayan iki ayrı iş parçacığı oluyor.
Bu kodu çalıştırdığınızda çıktı aşağıdakine benzer görünür:
Alındı: merhaba
Alındı: daha
Alındı: olusturulan
Alındı: fazla
Alındı: mesaj
Alındı: is
Alındı: sana
Alındı: parcacigindan
Sisteminizin zamanlamasına göre değerleri farklı bir sırada da görebilirsiniz.
İşte eşzamanlılığı hem ilginç hem de zor yapan şeylerden biri bu. thread::sleep
çağrılarındaki sürelerle oynarsanız her çalıştırmada biraz daha farklı ve
öngörülmesi daha zor çıktılar elde edersiniz.
Kanalların nasıl çalıştığını gördüğümüze göre, şimdi eşzamanlılığın başka bir yoluna bakalım.