Produttore e consumatore

Abbiamo visto che i processi possono comunicare scambiando messaggi (es. PIPE). Il modello a scambio di messaggi di presta, di fatto, ad applicazioni strutturate come il produttore/consumatore: un processo produce dati e l’altro li utilizza. (Esempio: server di stampa)

Scambio di messaggi

Nel modello a scambio di messaggi lo schema produttore/consumatore si realizza immediatamente come segue:

port A; // ad esempio una pipe condivisa

Produttore() {
  while(1) {
    /* produce d */
    send(A,d); // invia d su A
  }
}

Consumatore() {
  while(1) {
    receive(A,&d); // riceve d da A
    /* consuma d */
  }
}

Se utilizziamo una send asincrona (bloccante quando il buffer è pieno) e una receive sincrona, otteniamo una soluzione soddisfacente: il consumatore attende se non ci sono dati da consumare, il produttore attende se non c’è spazio nel buffer.

Memoria condivisa

Il modello a memoria condivisa viene utilizzato in molti contesti. Ad esempio:

  • per effettuare calcolo parallelo (in architetture multicore)
  • per realizzare applicazioni modulari
  • quanto è utile condividere informazioni tra processi e thread

Un processo ha, nella sua Process Control Block (PCB), informazioni relative a:

  • risorse allocate (codice, dati, memoria, I/O, …)
  • esecuzione (ID, priorità, stato PC e registri, stack, …)

Un thread è la parte di esecuzione di un processo ed eredita dal processo le risorse. Quando abbiamo più thread in un singolo processo si parla di multi-threading. In questo caso ogni thread avrà un ID distinto. I thread di uno stesso processo tipicamente ne condividono la memoria, in quanto essa fa parte delle risorse del processo.

Produttore-Consumatore con memoria condivisa e buffer illimitato

Vediamo quindi come realizzare un modello di comunicazione produttore/consumatore in presenza di memoria condivisa. Utilizzeremo la memoria come canale di comunicazione senza ricorrere a particolari meccanismi di comunicazione tra processi, quali le PIPE.

Per semplicità iniziamo considerando un buffer illimitato, implementato tramite un array buffer[] condiviso e due indici inserisci e preleva inizializzati a 0. Vediamo una prima soluzione errata:

data_t buffer[...]; // un buffer "teorico" di dimensione illimitata
int inserisci=0, preleva=0; // indici per l'accesso al buffer

Produttore() {
  while(1) {
    /* produce un elemento d */
    buffer[inserisci] = d;
    inserisci++;
  }
}

Consumatore() {
  while(1) {
    d = buffer[preleva];
    preleva++;
    /* consuma d */
  }
}

PROBLEMA: Il consumatore non attende che ci siano elementi nel buffer e quindi legge “sporco di memoria” nel caso sia più veloce del produttore. Non possiamo fare assunzioni sullo scheduling quindi dobbiamo introdurre un meccanismo di attesa.

SOLUZIONE: Una soluzione è data dalla tecnica di busy-waiting: il thread cicla a vuoto finché non ci sono elementi da consumare:

data_t buffer[...]; // un buffer "teorico" di dimensione illimitata
int inserisci=0, preleva=0; // indici per l'accesso al buffer

Produttore() {
  while(1) {
    /* produce un elemento d */
    buffer[inserisci] = d;
    inserisci++;
  }
}

Consumatore() {
  while(1) {
    while (inserisci == preleva) {}; // buffer vuoto attendo
    d = buffer[preleva];
    preleva++;
    /* consuma d */
  }
}

Produttore-Consumatore con memoria condivisa e buffer circolare

La soluzione presentata funziona, ma assume l’esistenza di un buffer illimitato. Come possiamo renderla realistica? Utilizziamo un buffer circolare di dimensione MAX. Ecco un primo tentativo non corretto:

data_t buffer[MAX]; // un buffer di dimensione MAX
int inserisci=0, preleva=0; // indici per l'accesso al buffer

Produttore() {
  while(1) {
    /* produce un elemento d */
    buffer[inserisci] = d;
    inserisci=(inserisci+1) % MAX // Il buffer è circolare
  }
}

Consumatore() {
  while(1) {
    while (inserisci == preleva) {}; // buffer vuoto attendo
    d = buffer[preleva];
    preleva=(preleva+1) % MAX // Il buffer è circolare
    /* consuma d */
  }
}

PROBLEMA: se il buffer ha dimensione limitata il produttore, raggiunto MAX, inizierà a scrivere dalla casella 0, sovrascrivendo i dati non ancora letti dal consumatore. Come nel caso della send, il produttore si deve bloccare quando il buffer è pieno.

SOLUZIONE: è necessario aggiungere un busy-waiting anche sul produttore ma non è possibile usare la stessa condizione utilizzata nel consumatore per il buffer vuoto, cioè inserisci==preleva, in quanto il codice stallerebbe: il produttore si bloccherebbe sul while (inserisci == preleva) {} senza mai produrre nulla. Una soluzione possibile è di considerare il buffer vuoto quando inserisci e preleva coincidono e il buffer pieno quando (inserisci+1) % MAX coincide con preleva (quando cioè c’è una sola cella libera). Il codice corretto è il seguente:

data_t buffer[MAX]; // un buffer di dimensione MAX
int inserisci=0, preleva=0; // indici per l'accesso al buffer

Produttore() {
  while(1) {
    /* produce un elemento d */
    while ((inserisci+1) % MAX == preleva) {}; // buffer pieno attendo
    buffer[inserisci] = d;
    inserisci=(inserisci+1) % MAX // Il buffer è circolare
  }
}

Consumatore() {
  while(1) {
    while (inserisci == preleva) {}; // buffer vuoto attendo
    d = buffer[preleva];
    preleva=(preleva+1) % MAX // Il buffer è circolare
    /* consuma d */
  }
}

Questa soluzione funzionante ha alcuni evidenti difetti:

  1. Spreca una cella di memoria per distinguere buffer pieno da buffer vuoto
  2. Spreca tempo di CPU nel busy-waiting
  3. non scala con più produttori e più consumatori perché la condivisione di inserisci e preleva comporterebbe altre interferenze (che discuteremo tra poco)

Interferenze tra variabili condivise

Per risolvere il primo punto si potrebbe usare un contatore condiviso inizializzato a 0. Ecco una tentativo non funzionante:

data_t buffer[MAX]; // un buffer di dimensione MAX
int inserisci=0, preleva=0; // indici per l'accesso al buffer
int contatore=0; // contatore per le condizioni di attesa

Produttore() {
  while(1) {
    /* produce un elemento d */
    while (contatore == MAX) {}; // buffer pieno attendo
    buffer[inserisci] = d;
    inserisci=(inserisci+1) % MAX // Il buffer è circolare
    contatore++; // aggiorno il contatore
  }
}

Consumatore() {
  while(1) {
    while (contatore == 0) {}; // buffer vuoto attendo
    d = buffer[preleva];
    preleva=(preleva+1) % MAX // Il buffer è circolare
    contatore--; // aggiorno il contatore
    /* consuma d */
  }
}

PROBLEMA: I thread aggiornano la variabile contatore contemporaneamente. Questo può creare interferenze che compromettono l’integrità dei dati:

L’incremento è in realtà una lettura e una riscrittura. Se r1 indica un registro, possiamo immaginare che l’incremento venga eseguito con le seguenti tre istruzioni

1a) r1 = contatore; (lettura da memoria in un registro)
2a) r1 = r1+1; (incremento del registro)
3a) contatore = r1 (riscrittura del registro in memoria)

e il decremento, analogamente:

1b) r2 = contatore;
2b) r2 = r2-1;
3b) contatore = r2

Otteniamo risultati diversi a seconda di come queste istruzioni vengono schedulate. Ad esempio partendo da contatore=5:

  • 1a,1b,2b,3b,2a,3a otteniamo contatore=6
  • 1b,1a,2a,3a,2b,3b otteniamo contatore=4
  • 1a,2a,3a,1b,2b,3b ottemiamo contatore=5

Race condition: l’output dipende dalla temporizzazione degli eventi. Accade quando c’è competizione per la stessa risorsa.

Serve un meccanismo per sincronizzare i thread ma purtroppo avevamo aggiunto il contatore proprio allo scopo di sincronizzare. Nella prossima lezione vedremo come risolvere questo problema di sincronizzazione in modo più generale e sistematico.