Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Mesaj İletimiyle İş Parçacıkları Arasında Veri Taşımak

Güvenli eşzamanlılık sağlamak için giderek daha popüler hale gelen yaklaşımlardan biri mesaj iletimi (message passing) modelidir. Bu modelde iş parçacıkları ya da aktörler, veri içeren mesajları birbirlerine göndererek iletişim kurar. Go dilinin belgelerinde geçen şu ünlü söz bunu güzel özetler: “Belleği paylaşarak iletişim kurmayın; iletişim kurarak belleği paylaşın.”

Mesaj göndermeye dayalı eşzamanlılık için Rust standart kütüphanesi kanal uygulaması sunar. Kanal (channel), verinin bir iş parçacığından diğerine gönderilmesini sağlayan genel bir programlama kavramıdır.

Programlamadaki bir kanalı, yönü belli bir su yolu gibi düşünebilirsiniz. Örneğin bir dereye lastik ördek bırakırsanız, ördek akış yönünde ilerleyip su yolunun sonuna kadar gider.

Bir kanalın iki yarısı vardır: gönderici ve alıcı. Gönderici, ördeği suya bıraktığınız yukarı kısımdır; alıcı ise ördeğin aşağı akışta ulaştığı yerdir. Kodunuzun bir bölümü göndermek istediği veriyle gönderici tarafın metodlarını çağırır; başka bir bölümü de alıcı ucunda yeni mesaj gelip gelmediğini kontrol eder. Gönderici ya da alıcı taraflardan biri düşürüldüğünde kanalın kapandığı söylenir.

Burada, değer üreten ve bunları kanal üzerinden gönderen bir iş parçacığı ile bu değerleri alıp ekrana yazdıran başka bir iş parçacığı olan küçük bir program kuracağız. Özelliği göstermek için kanaldan basit değerler göndereceğiz. Bu yönteme alıştıktan sonra, örneğin bir sohbet sistemi ya da hesabın farklı parçalarını yapan birçok iş parçacığının sonuçları tek bir iş parçacığında topladığı sistemler gibi, iletişim kurması gereken her durumda kanalları kullanabilirsiniz.

Önce 16-6 numaralı listede bir kanal oluşturacağız; ama henüz onunla hiçbir şey yapmayacağız. Bunun şimdilik derlenmediğine dikkat edin; çünkü Rust, kanal üzerinden hangi tür değerler göndermek istediğimizi henüz bilemez.

Filename: src/main.rs
use std::sync::mpsc;

fn main() {
    let (gonderici, alici) = mpsc::channel();
}
Listing 16-6: Bir kanal oluşturup iki yarısını gonderici ve alici değişkenlerine atamak

Yeni bir kanal oluşturmak için mpsc::channel fonksiyonunu çağırırız. Buradaki mpsc, multiple producer, single consumer ifadesinin kısaltmasıdır. Kısaca Rust standart kütüphanesindeki kanal uygulaması, bir kanalın değer üreten birden fazla gönderici ucu olmasına izin verir; ama o değerleri tüketen tek bir alıcı ucu vardır. Bunu birçok küçük akarsuyun birleşip tek bir nehre dönüşmesine benzetebilirsiniz: Farklı akışlardan gelen her şey sonunda aynı nehirde toplanır. Şimdilik tek bir göndericiyle başlayacağız; örnek çalışır hale gelince birden fazla gönderici ekleyeceğiz.

mpsc::channel fonksiyonu bir demet döndürür. İlk eleman gönderme ucu, yani göndericidir; ikinci eleman ise alma ucu, yani alıcıdır. Birçok alanda transmitter ve receiver için geleneksel olarak tx ve rx kısaltmaları kullanılır; biz burada değişkenleri doğrudan gonderici ve alici olarak adlandırdık. Bu demeti parçalayan bir desenle birlikte let ifadesi kullanıyoruz. let ifadelerindeki desenler ile parçalayıcı atamayı 19. bölümde inceleyeceğiz. Şimdilik bilinmesi gereken şu: mpsc::channel dönüşündeki demetin parçalarını çıkarmak için bu kullanım çok elverişlidir.

Şimdi gönderici tarafı oluşturulan bir iş parçacığına taşıyalım ve oradan tek bir dizgi gönderelim. Böylece oluşturulan iş parçacığı ana iş parçacığıyla iletişim kurmuş olacak. Bu, nehrin yukarı kısmına lastik ördek bırakmaya ya da bir iş parçacığından diğerine sohbet mesajı göndermeye benzer.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (gonderici, alici) = mpsc::channel();

    thread::spawn(move || {
        let deger = String::from("merhaba");
        gonderici.send(deger).unwrap();
    });
}
Listing 16-7: gonderici değerini oluşturulan iş parçacığına taşıyıp "merhaba" göndermek

Yine thread::spawn ile yeni bir iş parçacığı oluşturuyor, ardından move yardımıyla gonderici değerini kapanışa taşıyoruz. Böylece göndericinin sahipliği oluşturulan iş parçacığında oluyor. Kanal üzerinden mesaj gönderebilmek için o iş parçacığının göndericiye sahip olması gerekir.

Gönderici tarafında, göndermek istediğimiz değeri alan bir send metodu vardır. send metodu Result<T, E> döndürür. Yani alıcı daha önce düşürülmüşse ve değeri gönderecek yer kalmamışsa, gönderme işlemi hata döndürür. Bu örnekte hata olursa paniklemek için unwrap çağırıyoruz. Gerçek bir uygulamada ise bunu uygun şekilde ele almak isteriz; doğru hata yönetimi stratejileri için 9. bölüme dönebilirsiniz.

16-8 numaralı listede, ana iş parçacığında alıcıdan gelen değeri alacağız. Bu da nehrin sonundan lastik ördeği almak ya da gelen bir sohbet mesajını okumak gibidir.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (gonderici, alici) = mpsc::channel();

    thread::spawn(move || {
        let deger = String::from("merhaba");
        gonderici.send(deger).unwrap();
    });

    let alinan = alici.recv().unwrap();
    println!("Alındı: {alinan}");
}
Listing 16-8: Ana iş parçacığında "merhaba" değerini alıp ekrana yazdırmak

Alıcı tarafın kullanışlı iki metodu vardır: recv ve try_recv. Burada receive kelimesinin kısaltması olan recv metodunu kullanıyoruz. Bu metod, ana iş parçacığını bloklayarak kanal üzerinden bir değer gönderilmesini bekler. Bir değer geldiğinde onu Result<T, E> içinde döndürür. Gönderici uç kapandığında ise artık yeni değer gelmeyeceğini belirtmek için hata döndürür.

try_recv metodu ise bloklamaz; hemen Result<T, E> döndürür. O anda mesaj varsa Ok, yoksa Err gelir. Bu, mesaj beklerken aynı iş parçacığının başka işleri de varsa kullanışlıdır: Arada bir try_recv çağırabilir, mesaj geldiyse onu işleyebilir, gelmediyse kısa süre başka işler yapıp sonra yeniden kontrol edebilirsiniz.

Bu örnekte sadelik için recv kullandık; çünkü ana iş parçacığının yapacak başka bir işi yok, yalnızca mesaj gelmesini bekliyor.

16-8 numaralı listedeki kodu çalıştırdığımızda, ana iş parçacığından şu değerin yazdırıldığını görürüz:

Alındı: merhaba

Tam istediğimiz gibi!

Kanallar Üzerinden Sahiplik Aktarmak

Mesaj gönderirken sahiplik kuralları çok kritik bir rol oynar; çünkü güvenli eşzamanlı kod yazmanızı sağlar. Rust programlarınız boyunca sahipliği düşünmek, eşzamanlı programlamadaki hataları önlemenin büyük bir parçasıdır. Kanallar ile sahipliğin birlikte nasıl çalıştığını görmek için küçük bir deney yapalım: Oluşturulan iş parçacığında deger adlı bir değeri kanaldan gönderdikten sonra yeniden kullanmaya çalışacağız. 16-9 numaralı listedeki kodu derlemeyi deneyin; neden buna izin verilmediğini göreceksiniz.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (gonderici, alici) = mpsc::channel();

    thread::spawn(move || {
        let deger = String::from("merhaba");
        gonderici.send(deger).unwrap();
        println!("değer şu: {deger}");
    });

    let alinan = alici.recv().unwrap();
    println!("Alındı: {alinan}");
}
Listing 16-9: deger kanal üzerinden gönderildikten sonra onu yeniden kullanmaya çalışmak

Burada deger değişkenini gonderici.send ile kanala yolladıktan sonra ekrana yazdırmaya çalışıyoruz. Buna izin verilmesi kötü olurdu; çünkü değer başka bir iş parçacığına gönderildiği anda, o iş parçacığı bizim yeniden kullanmaya çalışmamızdan önce değeri değiştirebilir ya da düşürebilirdi. Böyle bir durumda tutarsız ya da hiç var olmayan veri yüzünden beklenmedik sonuçlar ve hatalar ortaya çıkabilirdi. Neyse ki Rust, 16-9 numaralı listedeki kodu derlemeye çalıştığımızda bize hata verir:

$ cargo run
   Compiling mesaj-iletimi v0.1.0 (file:///projects/mesaj-iletimi)
error[E0382]: borrow of moved value: `deger`
  --> src/main.rs:10:27
   |
 8 |         let deger = String::from("merhaba");
   |             ----- move occurs because `deger` has type `String`, which does not implement the `Copy` trait
 9 |         gonderici.send(deger).unwrap();
   |                        ----- value moved here
10 |         println!("değer şu: {deger}");
   |                             ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `mesaj-iletimi` (bin "mesaj-iletimi") due to 1 previous error

Eşzamanlılıkla ilgili hatamız bu sayede derleme zamanında yakalandı. send fonksiyonu parametresinin sahipliğini alır ve değer taşındığında alıcı onun yeni sahibi olur. Böylece gönderdikten sonra aynı değeri yanlışlıkla yeniden kullanmamız engellenir; sahiplik sistemi her şeyin doğru olduğundan emin olur.

Birden Fazla Değer Göndermek

16-8 numaralı listedeki kod derlenip çalıştı; ama aslında iki ayrı iş parçacığının kanal üzerinden konuştuğunu çok net göstermiyordu.

16-10 numaralı listede, 16-8’deki örneği daha görünür hale getirmek için bazı değişiklikler yaptık: Oluşturulan iş parçacığı artık birden fazla mesaj gönderecek ve her mesaj arasında bir saniye bekleyecek.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (gonderici, alici) = mpsc::channel();

    thread::spawn(move || {
        let degerler = vec![
            String::from("merhaba"),
            String::from("olusturulan"),
            String::from("is"),
            String::from("parcacigindan"),
        ];

        for deger in degerler {
            gonderici.send(deger).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for alinan in alici {
        println!("Alındı: {alinan}");
    }
}
Listing 16-10: Birden fazla mesaj gönderip her birinin arasında beklemek

Bu kez oluşturulan iş parçacığında, ana iş parçacığına göndermek istediğimiz dizgilerden oluşan bir vektör var. Vektör üzerinde dönüyor, her değeri tek tek gönderiyor ve her gönderim arasında thread::sleep ile bir saniye bekliyoruz.

Ana iş parçacığında artık recv fonksiyonunu açıkça çağırmıyoruz. Bunun yerine alici değerini bir yineleyici (iterator) gibi kullanıyoruz. Gelen her değeri ekrana yazdırıyoruz. Kanal kapandığında yineleme de sona eriyor.

16-10 numaralı listedeki kodu çalıştırdığınızda, her satır arasında yaklaşık bir saniye olacak şekilde aşağıdakine benzer bir çıktı görmeniz gerekir:

Alındı: merhaba
Alındı: olusturulan
Alındı: is
Alındı: parcacigindan

Ana iş parçacığındaki for döngüsünde ayrıca bir bekleme ya da gecikme kodu olmadığı için, ana iş parçacığının oluşturulan iş parçacığından değer gelmesini beklediğini buradan anlayabiliyoruz.

Birden Fazla Gönderici Oluşturmak

Daha önce mpsc kısaltmasının multiple producer, single consumer anlamına geldiğini söylemiştik. Şimdi bunu gerçekten kullanalım ve 16-10 numaralı listedeki kodu genişleterek, aynı alıcıya değer gönderen birden çok iş parçacığı oluşturalım. Bunu yapmak için göndericiyi klonlayacağız; 16-11 numaralı liste tam olarak bunu gösteriyor.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (gonderici, alici) = mpsc::channel();

    let gonderici1 = gonderici.clone();
    thread::spawn(move || {
        let degerler = vec![
            String::from("merhaba"),
            String::from("olusturulan"),
            String::from("is"),
            String::from("parcacigindan"),
        ];

        for deger in degerler {
            gonderici1.send(deger).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let degerler = vec![
            String::from("daha"),
            String::from("fazla"),
            String::from("mesaj"),
            String::from("sana"),
        ];

        for deger in degerler {
            gonderici.send(deger).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for alinan in alici {
        println!("Alındı: {alinan}");
    }

    // --snip--
}
Listing 16-11: Birden fazla göndericiden çoklu mesaj göndermek

Bu kez ilk iş parçacığını oluşturmadan önce göndericinin clone metodunu çağırıyoruz. Böylece ilk oluşturulan iş parçacığına verebileceğimiz yeni bir gönderici elde ediyoruz. Orijinal göndericiyi ise ikinci oluşturulan iş parçacığına veriyoruz. Böylece elimizde, tek bir alıcıya farklı mesajlar yollayan iki ayrı iş parçacığı oluyor.

Bu kodu çalıştırdığınızda çıktı aşağıdakine benzer görünür:

Alındı: merhaba
Alındı: daha
Alındı: olusturulan
Alındı: fazla
Alındı: mesaj
Alındı: is
Alındı: sana
Alındı: parcacigindan

Sisteminizin zamanlamasına göre değerleri farklı bir sırada da görebilirsiniz. İşte eşzamanlılığı hem ilginç hem de zor yapan şeylerden biri bu. thread::sleep çağrılarındaki sürelerle oynarsanız her çalıştırmada biraz daha farklı ve öngörülmesi daha zor çıktılar elde edersiniz.

Kanalların nasıl çalıştığını gördüğümüze göre, şimdi eşzamanlılığın başka bir yoluna bakalım.