Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Zarif Kapanış ve Temizlik

Liste 21-20’deki kod artık istekleri iş parçacığı havuzu üzerinden eşzamanlı işliyor. Ancak calisanlar, kimlik ve thread gibi alanların dolaylı kullanıldığına dair uyarılar alıyoruz; bu da aslında henüz hiçbir şeyi temizlemediğimizi hatırlatıyor. Ana iş parçacığını kaba biçimde ctrl-C ile durdurduğumuzda, diğer iş parçacıkları da o anda ne yapıyor olurlarsa olsunlar aniden kesiliyor.

Şimdi Drop trait’ini uygulayarak havuzdaki her iş parçacığı üzerinde join çağıracağız. Böylece kapanmadan önce üzerinde çalıştıkları istekleri bitirebilecekler. Sonra da onlara yeni görev almayı bırakıp kapanmaları gerektiğini bildireceğiz.

IsParcacigiHavuzu Üzerinde Drop Uygulamak

İlk adım Drop uygulaması. Havuz kapsam dışına çıktığında, bütün iş parçacıklarının işini bitirip birleşmesini istiyoruz. Liste 21-22 ilk denemeyi gösteriyor.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: mpsc::Sender<Gorev>,
}

type Gorev = Box<dyn FnOnce() + Send + 'static>;

impl IsParcacigiHavuzu {
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu {
            calisanlar,
            gonderici,
        }
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let gorev = Box::new(f);

        self.gonderici.send(gorev).unwrap();
    }
}

impl Drop for IsParcacigiHavuzu {
    fn drop(&mut self) {
        for calisan in &mut self.calisanlar {
            println!("Çalışan kapatılıyor: {}", calisan.kimlik);

            calisan.thread.join().unwrap();
        }
    }
}

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        let thread = thread::spawn(move || loop {
            let gorev = alici.lock().unwrap().recv().unwrap();

            println!("Çalışan {kimlik} bir görev aldı; çalıştırılıyor.");

            gorev();
        });

        Calisan { kimlik, thread }
    }
}
Listing 21-22: İş parçacığı havuzu kapsam dışına çıktığında her iş parçacığını birleştirmeye çalışmak

Burada her Calisan üzerinde dönüp onun iş parçacığına join çağırıyoruz. Ama derleyici hata verir; çünkü join, sahiplik alır. Bizde ise yalnızca ödünç alınmış calisan vardır.

Bunu çözmek için iş parçacığını Calisan içinden dışarı taşımamız gerekir. Bunun bir yolu alanı Option<thread::JoinHandle<()>> yapmak ve take ile içinden almak olabilir. Ancak bunu her yerde taşımak kodu gereksiz yere karmaşıklaştırabilir.

Bu bölümde daha temiz bir yaklaşım kullanacağız: Vec::drain. drain(..) ile vektördeki bütün öğeleri dışarı alıp onları tüketebiliriz. Güncel drop uygulaması şöyle görünür:

Filename: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: mpsc::Sender<Gorev>,
}

type Gorev = Box<dyn FnOnce() + Send + 'static>;

impl IsParcacigiHavuzu {
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu {
            calisanlar,
            gonderici,
        }
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let gorev = Box::new(f);

        self.gonderici.send(gorev).unwrap();
    }
}

impl Drop for IsParcacigiHavuzu {
    fn drop(&mut self) {
        for calisan in self.calisanlar.drain(..) {
            println!("Çalışan kapatılıyor: {}", calisan.kimlik);

            calisan.thread.join().unwrap();
        }
    }
}

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        let thread = thread::spawn(move || loop {
            let gorev = alici.lock().unwrap().recv().unwrap();

            println!("Çalışan {kimlik} bir görev aldı; çalıştırılıyor.");

            gorev();
        });

        Calisan { kimlik, thread }
    }
}
}

Bu sürüm derleyici hatasını çözer. Yine de bir sorun daha var: bu hâliyle kapanış istediğimiz gibi çalışmıyor.

İş Parçacıklarına Yeni Görev Dinlemeyi Bırakmalarını Söylemek

Sorun şu: Calisan iş parçacıkları sonsuz döngü içinde sürekli görev bekliyor. Biz join çağırınca, onlar hâlâ beklediği için ana iş parçacığı sonsuza kadar bloklanabilir.

Bunu düzeltmek için önce gonderici tarafını açıkça düşürmemiz gerekiyor. Kanalın gönderici tarafı kapanınca, recv hata döndürür; bu da alıcı tarafta döngüden çıkma sinyali olarak kullanılabilir.

Liste 21-23, gonderici değerini Option içine alıp drop içinde take() ile kapattığımız sürümü gösteriyor.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: Option<mpsc::Sender<Gorev>>,
}
// --snip--

type Gorev = Box<dyn FnOnce() + Send + 'static>;

impl IsParcacigiHavuzu {
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        // --snip--

        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu {
            calisanlar,
            gonderici: Some(gonderici),
        }
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let gorev = Box::new(f);

        self.gonderici.as_ref().unwrap().send(gorev).unwrap();
    }
}

impl Drop for IsParcacigiHavuzu {
    fn drop(&mut self) {
        drop(self.gonderici.take());

        for calisan in self.calisanlar.drain(..) {
            println!("Çalışan kapatılıyor: {}", calisan.kimlik);

            calisan.thread.join().unwrap();
        }
    }
}

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        let thread = thread::spawn(move || loop {
            let gorev = alici.lock().unwrap().recv().unwrap();

            println!("Çalışan {kimlik} bir görev aldı; çalıştırılıyor.");

            gorev();
        });

        Calisan { kimlik, thread }
    }
}
Listing 21-23: Calisan iş parçacıklarını birleştirmeden önce gonderici değerini açıkça düşürmek

Bu değişiklikten sonra kanal kapanır. Böylece Calisan tarafında recv artık hata döndürmeye başlayabilir. Şimdi döngüyü buna göre güncelleyelim. Liste 21-24, recv hata döndürdüğünde döngüden zarif biçimde çıkan sürümü gösteriyor.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: Option<mpsc::Sender<Gorev>>,
}

type Gorev = Box<dyn FnOnce() + Send + 'static>;

impl IsParcacigiHavuzu {
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu {
            calisanlar,
            gonderici: Some(gonderici),
        }
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let gorev = Box::new(f);

        self.gonderici.as_ref().unwrap().send(gorev).unwrap();
    }
}

impl Drop for IsParcacigiHavuzu {
    fn drop(&mut self) {
        drop(self.gonderici.take());

        for calisan in self.calisanlar.drain(..) {
            println!("Çalışan kapatılıyor: {}", calisan.kimlik);

            calisan.thread.join().unwrap();
        }
    }
}

struct Calisan {
    kimlik: usize,
    thread: thread::JoinHandle<()>,
}

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        let thread = thread::spawn(move || loop {
            let mesaj = alici.lock().unwrap().recv();

            match mesaj {
                Ok(gorev) => {
                    println!(
                        "Çalışan {kimlik} bir görev aldı; çalıştırılıyor."
                    );

                    gorev();
                }
                Err(_) => {
                    println!(
                        "Çalışan {kimlik} bağlantısı kesildi; kapatılıyor."
                    );
                    break;
                }
            }
        });

        Calisan { kimlik, thread }
    }
}
Listing 21-24: recv hata döndürdüğünde döngüden açıkça çıkmak

Artık her Calisan, kanalın kapandığını görünce yeni görev beklemeyi bırakır, bir ileti yazar ve döngüden çıkar.

Sunucuyu Sınırlı İstekten Sonra Kapatmak

Bu davranışı gözle görmek için, main fonksiyonunu yalnızca iki isteği kabul edecek şekilde değiştirebiliriz. Liste 21-25 bunu gösteriyor.

Filename: src/main.rs
use merhaba::IsParcacigiHavuzu;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let dinleyici = TcpListener::bind("127.0.0.1:7878").unwrap();
    let havuz = IsParcacigiHavuzu::new(4);

    for akis in dinleyici.incoming().take(2) {
        let akis = akis.unwrap();

        havuz.calistir(|| {
            baglantiyi_isle(akis);
        });
    }

    println!("Kapatılıyor.");
}

fn baglantiyi_isle(mut akis: TcpStream) {
    let tamponlu_okuyucu = BufReader::new(&akis);
    let istek_satiri = tamponlu_okuyucu.lines().next().unwrap().unwrap();

    let (durum_satiri, dosya_adi) = match &istek_satiri[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "merhaba.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "merhaba.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let icerik = fs::read_to_string(dosya_adi).unwrap();
    let uzunluk = icerik.len();

    let yanit =
        format!("{durum_satiri}\r\nContent-Length: {uzunluk}\r\n\r\n{icerik}");

    akis.write_all(yanit.as_bytes()).unwrap();
}
Listing 21-25: Döngüden çıkarak sunucuyu iki isteğin ardından kapatmak

Gerçek bir web sunucusunun iki istekten sonra kapanmasını istemezsiniz. Buradaki amaç yalnızca zarif kapanışın gerçekten çalıştığını göstermek.

take(2), yineleyicinin en fazla ilk iki öğesini almasını sağlar. main sonlandığında IsParcacigiHavuzu da kapsam dışına çıkar ve drop çalışır.

Sunucuyu cargo run ile başlatıp üç istek gönderirseniz, üçüncü isteğin başarısız olduğunu ve terminalde buna benzer iletiler gördüğünüzü fark edersiniz:

$ cargo run
   Compiling merhaba v0.1.0 (file:///projects/merhaba)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
     Running `target/debug/merhaba`
Çalışan 0 bir görev aldı; çalıştırılıyor.
Kapatılıyor.
Çalışan kapatılıyor: 0
Çalışan 3 bir görev aldı; çalıştırılıyor.
Çalışan 1 bağlantısı kesildi; kapatılıyor.
Çalışan 2 bağlantısı kesildi; kapatılıyor.
Çalışan 3 bağlantısı kesildi; kapatılıyor.
Çalışan 0 bağlantısı kesildi; kapatılıyor.
Çalışan kapatılıyor: 1
Çalışan kapatılıyor: 2
Çalışan kapatılıyor: 3

İletilerin sırası sizde farklı olabilir. Ama genel akış şudur:

  • İlk iki isteği bazı Calisanlar alır.
  • Sunucu artık yeni bağlantı kabul etmeyi bırakır.
  • drop, gonderici değerini düşürür.
  • Her Calisan, kanal kapandığını fark eder ve döngüden çıkar.
  • Havuz da her iş parçacığı üzerinde join çağırıp temiz kapanış yapar.

İlginç bir ayrıntı daha var: ana iş parçacığı, bazı Calisanlar hata alıp döngüden çıkmadan önce ilk join çağrısını yapabilir. Bu durumda kısa süre bloklanır; ama diğer iş parçacıkları işlerini bitirdikçe hepsi sırayla kapanır.

Böylece projemizi tamamlamış olduk. Artık elimizde iş parçacığı havuzu kullanan, istekleri eşzamanlı ele alan ve zarif biçimde kapanabilen temel bir web sunucusu var.

Tam sürüm için aşağıdaki iki dosyaya bakabilirsiniz:

Filename: src/main.rs
use merhaba::IsParcacigiHavuzu;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let dinleyici = TcpListener::bind("127.0.0.1:7878").unwrap();
    let havuz = IsParcacigiHavuzu::new(4);

    for akis in dinleyici.incoming().take(2) {
        let akis = akis.unwrap();

        havuz.calistir(|| {
            baglantiyi_isle(akis);
        });
    }

    println!("Kapatılıyor.");
}

fn baglantiyi_isle(mut akis: TcpStream) {
    let tamponlu_okuyucu = BufReader::new(&akis);
    let istek_satiri = tamponlu_okuyucu.lines().next().unwrap().unwrap();

    let (durum_satiri, dosya_adi) = match &istek_satiri[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "merhaba.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "merhaba.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let icerik = fs::read_to_string(dosya_adi).unwrap();
    let uzunluk = icerik.len();

    let yanit =
        format!("{durum_satiri}\r\nContent-Length: {uzunluk}\r\n\r\n{icerik}");

    akis.write_all(yanit.as_bytes()).unwrap();
}
Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct IsParcacigiHavuzu {
    calisanlar: Vec<Calisan>,
    gonderici: Option<mpsc::Sender<Gorev>>,
}

type Gorev = Box<dyn FnOnce() + Send + 'static>;

impl IsParcacigiHavuzu {
    /// Yeni bir IsParcacigiHavuzu olusturur.
    ///
    /// Boyut, havuzdaki is parcacigi sayisidir.
    ///
    /// # Panics
    ///
    /// `new` fonksiyonu, boyut sifirsa panikler.
    pub fn new(boyut: usize) -> IsParcacigiHavuzu {
        assert!(boyut > 0);

        let (gonderici, alici) = mpsc::channel();

        let alici = Arc::new(Mutex::new(alici));

        let mut calisanlar = Vec::with_capacity(boyut);

        for kimlik in 0..boyut {
            calisanlar.push(Calisan::new(kimlik, Arc::clone(&alici)));
        }

        IsParcacigiHavuzu {
            calisanlar,
            gonderici: Some(gonderici),
        }
    }

    pub fn calistir<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let gorev = Box::new(f);

        self.gonderici.as_ref().unwrap().send(gorev).unwrap();
    }
}

impl Drop for IsParcacigiHavuzu {
    fn drop(&mut self) {
        drop(self.gonderici.take());

        for calisan in &mut self.calisanlar {
            println!("Çalışan kapatılıyor: {}", calisan.kimlik);

            if let Some(thread) = calisan.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Calisan {
    kimlik: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Calisan {
    fn new(kimlik: usize, alici: Arc<Mutex<mpsc::Receiver<Gorev>>>) -> Calisan {
        let thread = thread::spawn(move || loop {
            let mesaj = alici.lock().unwrap().recv();

            match mesaj {
                Ok(gorev) => {
                    println!(
                        "Çalışan {kimlik} bir görev aldı; çalıştırılıyor."
                    );

                    gorev();
                }
                Err(_) => {
                    println!(
                        "Çalışan {kimlik} bağlantısı kesildi; kapatılıyor."
                    );
                    break;
                }
            }
        });

        Calisan {
            kimlik,
            thread: Some(thread),
        }
    }
}

Bu projeyi geliştirmeye devam etmek isterseniz:

  • IsParcacigiHavuzu ve açık metodlarına daha fazla belge ekleyebilirsiniz.
  • Kütüphane işlevleri için testler yazabilirsiniz.
  • unwrap çağrılarını daha sağlam hata yönetimiyle değiştirebilirsiniz.
  • IsParcacigiHavuzunu web sunucusu dışında başka görevlerde kullanabilirsiniz.
  • crates.io üstündeki hazır bir havuz crate’i ile benzer sunucuyu yeniden yazıp API farklarını inceleyebilirsiniz.

Özet

Kitabın sonuna geldiniz. Artık kendi Rust projelerinizi geliştirecek, başkalarının projelerine katkı verecek ve daha ileri konuları anlamlandıracak temel bilgiye sahipsiniz. Takıldığınız yerde yardım alabileceğiniz, canlı ve destekleyici bir Rust topluluğu da var.