Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Tek İş Parçacıklı Sunucudan Çok İş Parçacıklı Sunucuya

Şu anda sunucu istekleri sırayla işliyor. Yani ilk bağlantı bitmeden ikincisine geçemiyor. Kısa süren istekler ile uzun süren istekler karışınca bu yaklaşım verimsiz hâle geliyor. Özellikle uzun süren bir istek geldiğinde, arkasından gelen bütün istekler gereksiz yere beklemek zorunda kalıyor. Önce bu sorunu görünür hâle getireceğiz, sonra çözeceğiz.

Yavaş Bir İsteği Taklit Etmek

Sorunu görmek için /sleep yoluna gelen isteği yapay olarak yavaşlatalım. Liste 21-10, bu istekte yanıt göndermeden önce beş saniye bekleyen sürümü gösteriyor.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let dinleyici = TcpListener::bind("127.0.0.1:7878").unwrap();

    for akis in dinleyici.incoming() {
        let akis = akis.unwrap();

        baglantiyi_isle(akis);
    }
}

fn baglantiyi_isle(mut akis: TcpStream) {
    // --snip--

    let tamponlu_okuyucu = BufReader::new(&akis);
    let istek_satiri = tamponlu_okuyucu.lines().next().unwrap().unwrap();

    let (durum_satiri, dosya_adi) = match &istek_satiri[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "merhaba.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "merhaba.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let icerik = fs::read_to_string(dosya_adi).unwrap();
    let uzunluk = icerik.len();

    let yanit =
        format!("{durum_satiri}\r\nContent-Length: {uzunluk}\r\n\r\n{icerik}");

    akis.write_all(yanit.as_bytes()).unwrap();
}
Listing 21-10: Beş saniye uyuyarak yavaş istek taklit etmek

Artık üç durumumuz olduğu için if yerine match kullanıyoruz. istek_satiri dilimini eşleştirerek:

  • / isteğinde başarılı sayfayı
  • /sleep isteğinde beş saniye bekledikten sonra yine başarılı sayfayı
  • diğer bütün isteklerde 404 sayfasını

döndürüyoruz.

Sunucuyu çalıştırıp iki tarayıcı sekmesi açın: biri http://127.0.0.1:7878, diğeri http://127.0.0.1:7878/sleep. Önce /sleep isteğini gönderip sonra / isteğini yenilerseniz, ikinci isteğin de beklemek zorunda kaldığını görürsünüz. Sorun tam olarak bu: tek iş parçacıklı yapı, bağımsız istekleri birbirinin arkasına diziyor.

İş Parçacığı Havuzuyla Aktarım Kapasitesini Artırmak

İş parçacığı havuzu (thread pool), önceden başlatılmış ve görev bekleyen bir iş parçacıkları grubudur. Program yeni görev aldığında, havuzdaki uygun iş parçacıklarından biri bu görevi üstlenir. İşini bitiren iş parçacığı tekrar havuza döner ve yeni görev bekler.

Bu yaklaşım sayesinde bağlantıları eşzamanlı işleyebilir, dolayısıyla sunucunun toplam aktarım kapasitesini artırabiliriz.

Havuzdaki iş parçacığı sayısını küçük bir sabit sayı ile sınırlayacağız. Eğer her istek geldiğinde yeni iş parçacığı oluştursaydık, çok sayıda istek gönderen biri tüm sistem kaynaklarını tüketebilir ve sunucuyu kullanılmaz hâle getirebilirdi.

Biz bunun yerine, sabit sayıda iş parçacığı oluşturup gelen istekleri bu havuza vereceğiz. Havuz bir görev kuyruğu tutacak. Her iş parçacığı bu kuyruktan görev alacak, görevi çalıştıracak ve sonra yeni görev isteyecek. Böylece aynı anda en fazla N istek işlenebilir; burada N, havuzdaki iş parçacığı sayısıdır.

Bu, aktarım kapasitesini artırmanın tek yolu değildir. fork/join, tek iş parçacıklı asenkron G/Ç, çok iş parçacıklı asenkron G/Ç gibi başka modeller de vardır. Ama burada bizim hedefimiz düşük seviyede temel fikri öğrenmek.

Her İstek İçin Ayrı İş Parçacığı Oluşturmak

İlk olarak, sanki her bağlantı için yeni iş parçacığı açacakmışız gibi düşünelim. Bu nihai çözümümüz olmayacak; ama çok iş parçacıklı çalışan ilk sürümü görmek için iyi bir başlangıçtır. Liste 21-11, for döngüsünde her akış için yeni iş parçacığı başlatan kodu gösteriyor.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let dinleyici = TcpListener::bind("127.0.0.1:7878").unwrap();

    for akis in dinleyici.incoming() {
        let akis = akis.unwrap();

        thread::spawn(|| {
            baglantiyi_isle(akis);
        });
    }
}

fn baglantiyi_isle(mut akis: TcpStream) {
    let tamponlu_okuyucu = BufReader::new(&akis);
    let istek_satiri = tamponlu_okuyucu.lines().next().unwrap().unwrap();

    let (durum_satiri, dosya_adi) = match &istek_satiri[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "merhaba.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "merhaba.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let icerik = fs::read_to_string(dosya_adi).unwrap();
    let uzunluk = icerik.len();

    let yanit =
        format!("{durum_satiri}\r\nContent-Length: {uzunluk}\r\n\r\n{icerik}");

    akis.write_all(yanit.as_bytes()).unwrap();
}
Listing 21-11: Her akış için yeni iş parçacığı başlatmak

Bu sürümü çalıştırıp bir sekmede /sleep, başka sekmelerde / isteği gönderirseniz, kısa isteklerin artık uzun isteği beklemediğini görürsünüz. Ama bu yaklaşımın da sınırı yoktur; istek geldikçe yeni iş parçacığı açılır. Uzun vadede bu sistemi zorlar.

  1. bölümde gördüğümüz async ve await tam da böyle senaryolarda çok güçlüdür. Ama burada önce iş parçacığı havuzunu elle kuracağız.

Sınırlı Sayıda İş Parçacığı Oluşturmak

Amacımız, thread::spawn ile çok benzer bir arayüze sahip bir iş parçacığı havuzu yazmak. Böylece kodu kullanan taraf için geçiş çok büyük olmaz. Liste 21-12, kullanmak istediğimiz hayali IsParcacigiHavuzu arayüzünü gösteriyor.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let dinleyici = TcpListener::bind("127.0.0.1:7878").unwrap();
    let havuz = IsParcacigiHavuzu::new(4);

    for akis in dinleyici.incoming() {
        let akis = akis.unwrap();

        havuz.calistir(|| {
            baglantiyi_isle(akis);
        });
    }
}

fn baglantiyi_isle(mut akis: TcpStream) {
    let tamponlu_okuyucu = BufReader::new(&akis);
    let istek_satiri = tamponlu_okuyucu.lines().next().unwrap().unwrap();

    let (durum_satiri, dosya_adi) = match &istek_satiri[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "merhaba.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "merhaba.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let icerik = fs::read_to_string(dosya_adi).unwrap();
    let uzunluk = icerik.len();

    let yanit =
        format!("{durum_satiri}\r\nContent-Length: {uzunluk}\r\n\r\n{icerik}");

    akis.write_all(yanit.as_bytes()).unwrap();
}
Listing 21-12: Ulaşmak istediğimiz IsParcacigiHavuzu arayüzü

Burada IsParcacigiHavuzu::new(4) ile dört iş parçacıklı yeni havuz oluşturuyoruz. Ardından havuz.calistir(...) ile her bağlantı için çalıştırılacak kapanışı veriyoruz. Kod henüz derlenmeyecek; ama bu iyi. Derleyicinin yönlendirmesiyle adım adım havuzu oluşturacağız.

Derleyici Yönlendirmeli Geliştirme ile IsParcacigiHavuzu Kurmak

Önce Liste 21-12’deki değişikliği yapıp cargo check çalıştırın. İlk hata, bize bir IsParcacigiHavuzu türü ya da modülü eksik olduğunu söyleyecektir. Güzel; şimdi onu yazalım.

Bu havuz uygulamasının web sunucusundan bağımsız olmasını istiyoruz. Bu yüzden merhaba crate’ini yalnızca ikili crate olmaktan çıkarıp kütüphane crate olarak da kullanalım. src/lib.rs içine Liste 21-13’teki en basit yapıyı ekleyin.

Filename: src/lib.rs
pub struct IsParcacigiHavuzu;

impl IsParcacigiHavuzu {
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        IsParcacigiHavuzu
    }

    // --snip--
    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-13: Şimdilik en basit IsParcacigiHavuzu tanımı

Sonra main.rs içine bu türü kapsam içine alan satırı ekleyin:

Filename: src/main.rs
use merhaba::IsParcacigiHavuzu;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let dinleyici = TcpListener::bind("127.0.0.1:7878").unwrap();
    let havuz = IsParcacigiHavuzu::new(4);

    for akis in dinleyici.incoming() {
        let akis = akis.unwrap();

        havuz.calistir(|| {
            baglantiyi_isle(akis);
        });
    }
}

fn baglantiyi_isle(mut akis: TcpStream) {
    let tamponlu_okuyucu = BufReader::new(&akis);
    let istek_satiri = tamponlu_okuyucu.lines().next().unwrap().unwrap();

    let (durum_satiri, dosya_adi) = match &istek_satiri[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "merhaba.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "merhaba.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let icerik = fs::read_to_string(dosya_adi).unwrap();
    let uzunluk = icerik.len();

    let yanit =
        format!("{durum_satiri}\r\nContent-Length: {uzunluk}\r\n\r\n{icerik}");

    akis.write_all(yanit.as_bytes()).unwrap();
}

Şimdi derleyici bize new metodunun eksik olduğunu söyleyecek. En basit biçimiyle bunu yazalım:

Filename: src/lib.rs
pub struct IsParcacigiHavuzu;

impl IsParcacigiHavuzu {
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        IsParcacigiHavuzu
    }
}

Burada boyut için usize kullanıyoruz; çünkü negatif sayıda iş parçacığı zaten anlamlı değildir ve koleksiyon boyutları için Rust’ta doğal tür usize’dır.

Bir sonraki hata, calistir metodunun eksik olduğunu söyleyecek. Şimdilik en basit sürümünü tanımlayalım:

Filename: src/lib.rs
pub struct IsParcacigiHavuzu;

impl IsParcacigiHavuzu {
    // --snip--
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        IsParcacigiHavuzu
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

calistir, bir kapanış alıyor. Bu kapanış için FnOnce + Send + 'static sınırını kullanıyoruz. Bunun nedeni, görevi tek seferlik çalıştıracak olmamız, bu görevin başka iş parçacığına aktarılabilmesi gerekmesi ve ömrünün çağıran kapsamdan bağımsız olmasıdır.

Bu noktada kod derlenir; ama havuz henüz hiçbir şey yapmaz. Yine de arayüzün iskeleti hazır.

new İçinde İş Parçacığı Sayısını Doğrulamak

boyut = 0 da usize için geçerli bir değer olduğu hâlde, sıfır iş parçacıklı havuz anlamsızdır. Bu yüzden new içinde assert! ile boyut > 0 kontrolü yapacağız. Liste 21-13’teki belgeli sürüm bunu gösteriyor.

Bu örnekte new panikler. İstersek bunun yerine build adında Result döndüren bir API de tasarlayabilirdik; ama burada sıfır iş parçacıklı havuz oluşturmayı kurtarılamaz hata sayıyoruz.

İş Parçacıklarını Saklayacak Yer Açmak

Şimdi geçerli sayıda iş parçacığı oluşturup bunları havuz içinde saklamamız gerekiyor. thread::spawn bize JoinHandle<T> döndürür. Bizim kapanışlarımız bir değer döndürmeyeceği için bu tür JoinHandle<()> olur.

Liste 21-14, doğrudan iş parçacıklarını tutan vektörlü ilk sürümü gösteriyor.

Filename: src/lib.rs
use std::thread;

pub struct IsParcacigiHavuzu {
    is_parcacigis: Vec<thread::JoinHandle<()>>,
}

impl IsParcacigiHavuzu {
    // --snip--
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let mut is_parcacigis = Vec::with_capacity(boyut);

        for _ in 0..boyut {
            // create some is_parcacigis and store them in the vector
        }

        IsParcacigiHavuzu { is_parcacigis }
    }
    // --snip--

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-14: IsParcacigiHavuzu içinde iş parçacıklarını tutacak vektör oluşturmak

Bu sürüm henüz gerçek iş parçacığı oluşturmuyor; ama yapıyı hazırlıyor.

IsParcacigiHavuzu İçinden İş Parçacığına Kod Göndermek

Asıl zorluk burada başlıyor: thread::spawn, iş parçacığı oluşturulduğu anda çalıştırılacak kod bekler. Oysa biz iş parçacıklarını önceden oluşturup, çalıştıracakları işi daha sonra vermek istiyoruz.

Bunu çözmek için araya yeni bir yapı ekleyeceğiz: Calisan. Her Calisan, bir JoinHandle<()> ve ayırt edici bir kimlik taşıyacak. Böylece:

  1. Calisan, kimlik ve JoinHandle<()> tutacak
  2. IsParcacigiHavuzu, doğrudan iş parçacıkları yerine Calisan vektörü tutacak
  3. Calisan::new, bir kimlik alıp boş kapanışla başlatılmış iş parçacığına sahip Calisan döndürecek
  4. IsParcacigiHavuzu::new, bu Calisan örneklerini oluşturup saklayacak

Liste 21-15 bu düzeni gösteriyor.

Filename: src/lib.rs
use std::thread;

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
}

impl IsParcacigiHavuzu {
    // --snip--
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik));
        }

        IsParcacigiHavuzu { calisanlar }
    }
    // --snip--

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

impl Calisan {
    fn new(kimlik: usize) -> Calisan {
        let thread = thread::spawn(|| {});

        Calisan { kimlik, thread }
    }
}
Listing 21-15: Doğrudan iş parçacıkları yerine Calisan örnekleri tutacak şekilde IsParcacigiHavuzu yapısını değiştirmek

IsParcacigiHavuzu içindeki alan adı artık doğrudan iş parçacığı tutmadığı için calisanlar oldu. Calisan ve onun new fonksiyonu dış dünyaya açık değil; bu, havuzun iç ayrıntısı.

Kanallar Üzerinden Görev Göndermek

Şimdi calistir metodunun aldığı kapanışı, önceden çalışan Calisan iş parçacıklarına nasıl ulaştıracağımızı çözmeliyiz. Bunun için 16. bölümde gördüğümüz kanalları kullanacağız.

Fikir şu:

  1. IsParcacigiHavuzu, bir kanal oluşturup gönderici tarafını tutacak
  2. Her Calisan, alıcı tarafına erişecek
  3. Kapanışları taşımak için Gorev adlı bir tür tanımlayacağız
  4. calistir, gelen görevi kanal üzerinden gönderecek
  5. Her Calisan, kendi iş parçacığında alıcıyı dinleyip gelen görevi çalıştıracak

Liste 21-16, bu yapının başlangıcını gösteriyor.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: mpsc::Sender<Gorev>,
}

struct Gorev;

impl IsParcacigiHavuzu {
    // --snip--
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik));
        }

        IsParcacigiHavuzu { calisanlar, gonderici }
    }
    // --snip--

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

impl Calisan {
    fn new(kimlik: usize) -> Calisan {
        let thread = thread::spawn(|| {});

        Calisan { kimlik, thread }
    }
}
Listing 21-16: Gorev örneklerini ileten kanalın göndericisini saklayacak biçimde IsParcacigiHavuzu yapısını değiştirmek

Ardından alıcıyı her Calisan içine geçirmeye çalışıyoruz. Ama bir alıcıyı birden çok Calisan arasında paylaşmak istediğimizde derleyici hata verir; çünkü Rust’ın kanal modeli tek tüketicilidir.

Bu yüzden alıcıyı Arc<Mutex<T>> içine koymamız gerekir. Arc, birden fazla Calisanın aynı alıcıyı paylaşmasını; Mutex ise aynı anda yalnızca bir Calisanın görev çekmesini sağlar. Liste 21-18 bunu gösteriyor.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: mpsc::Sender<Gorev>,
}

struct Gorev;

impl IsParcacigiHavuzu {
    // --snip--
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu { calisanlar, gonderici }
    }

    // --snip--

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        // --snip--
        let thread = thread::spawn(|| {
            alici;
        });

        Calisan { kimlik, thread }
    }
}
Listing 21-18: Alıcıyı Arc ve Mutex kullanarak Calisan örnekleri arasında paylaşmak

calistir Metodunu Uygulamak

Şimdi Gorev türünü, calistir içinde aldığımız kapanışı tutan Box<dyn FnOnce() + Send + 'static> takma adı hâline getirebiliriz. Liste 21-19 bunu yapıyor.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: mpsc::Sender<Gorev>,
}

// --snip--

type Gorev = Box<dyn FnOnce() + Send + 'static>;

impl IsParcacigiHavuzu {
    // --snip--
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu { calisanlar, gonderici }
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let gorev = Box::new(f);

        self.gonderici.send(gorev).unwrap();
    }
}

// --snip--

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        let thread = thread::spawn(|| {
            alici;
        });

        Calisan { kimlik, thread }
    }
}
Listing 21-19: Her kapanışı tutan Box için Gorev takma adı oluşturmak ve görevi kanal üzerinden göndermek

Sonrasında, Calisan::new içinde başlatılan iş parçacığının gerçekten görev alıp çalıştırmasını sağlamalıyız. Liste 21-20’de, her Calisanın sürekli olarak kanaldan görev çekip bunları çalıştırdığı sürüm var.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: mpsc::Sender<Gorev>,
}

type Gorev = Box<dyn FnOnce() + Send + 'static>;

impl IsParcacigiHavuzu {
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu {
            calisanlar,
            gonderici,
        }
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let gorev = Box::new(f);

        self.gonderici.send(gorev).unwrap();
    }
}

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        let thread = thread::spawn(move || loop {
            let gorev = alici.lock().unwrap().recv().unwrap();

            println!("Çalışan {kimlik} bir görev aldı; çalıştırılıyor.");

            gorev();
        });

        Calisan { kimlik, thread }
    }
}
Listing 21-20: Calisan iş parçacığı içinde görevleri alıp çalıştırmak

Burada önce lock ile Mutex kilidi alıyoruz, sonra recv ile kanaldan görev bekliyoruz. recv engelleyicidir; yani görev yoksa iş parçacığı bekler. Yeni görev geldiğinde de onu çalıştırır.

Bu noktada havuz artık gerçekten çalışır. cargo run ile çalıştırıp birkaç istek gönderdiğinizde, dört farklı Calisanın sırayla görev aldığını görmelisiniz.

İsterseniz burada durup şunu da düşünebilirsiniz: Eğer kapanış yerine Future çalıştırıyor olsaydık neler farklı olurdu? Hangi türler değişirdi, hangileri aynı kalırdı?

Son olarak, while let kullanarak başka bir yazım biçimi de mümkün. Liste 21-21 bunu gösteriyor; ama orada kilidin yaşam süresi daha uzun kaldığı için tercih etmiyoruz.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: mpsc::Sender<Gorev>,
}

type Gorev = Box<dyn FnOnce() + Send + 'static>;

impl IsParcacigiHavuzu {
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu {
            calisanlar,
            gonderici,
        }
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let gorev = Box::new(f);

        self.gonderici.send(gorev).unwrap();
    }
}

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        let thread = thread::spawn(move || {
            while let Ok(gorev) = alici.lock().unwrap().recv() {
                println!("Çalışan {kimlik} bir görev aldı; çalıştırılıyor.");

                gorev();
            }
        });

        Calisan { kimlik, thread }
    }
}
Listing 21-21: Calisan::new için while let kullanan alternatif uygulama

Bu sürümde gorev() çağrısı bitene kadar kilit tutulabildiği için diğer Calisanlar yeni görev alamaz. Bu da istemediğimiz bir darboğaz yaratır.