Contattaci
Lasciaci i tuoi riferimenti, saremo felici di contattarti il prima possibile e organizzare una consulenza gratuita.
Reactive Programming: parallelizzare con Project Reactor
Cos'è la programmazione reattiva
Il reactive programming o "programmazione reattiva" è un paradigma di programmazione che si concentra sulla gestione di flussi di dati asincroni e di eventi che possono accadere in modo imprevedibile e non deterministico.
Il framework Project Reactor fornisce un'implementazione di questo paradigma per la piattaforma Java, consentendo agli sviluppatori di creare applicazioni reattive in modo più semplice ed efficiente.
Nella programmazione reattiva ci sono due figure principali:
- Observable: è la fonte di dati o eventi. Produttore di flussi di dati asincroni;
- Observer: è il consumatore di dati o eventi all'interno del flusso. Possono eseguire azioni come l'elaborazione dei dati, la trasformazione, la filtrazione o la gestione degli errori.
Nel mondo reattivo, gli Observable e gli Observer collaborano per creare un flusso di dati asincrono in cui le informazioni fluiranno dal produttore al consumatore in modo efficiente e sincronizzato. Questo paradigma è particolarmente adatto per gestire scenari in cui gli eventi sono generati in modo sporadico o non deterministico, come le richieste di un server, gli aggiornamenti di sensori o i dati di flusso provenienti da fonti esterne.
Esistono diverse librerie Java che implementano il paradigma della programmazione reattiva. Tra le più conosciute, troviamo Project Reactor e RxJava. In questo articolo ci concentreremo su Project Reactor, ma è importante notare che RxJava è un'altra opzione valida.
Introduzione a Project Reactor
Iniziamo col definire cosa sia Project Reactor.
Project Reactor è un framework reattivo basato su JVM che offre un supporto avanzato per la programmazione reattiva.
Concetti chiave di Project Reactor
Di seguito vediamo alcuni concetti chiave associati a Project Reactor.
Flux e Mono
Project Reactor introduce due tipi principali di sequenze reattive:
- Flux: Rappresenta uno stream di dati che può emettere zero o più elementi;
- Mono: Rappresenta uno stream di dati che può emettere al massimo un elemento (zero o uno).
Operatori Reattivi
Project Reactor offre una vasta gamma di operatori reattivi per eseguire operazioni di trasformazione, filtraggio, concatenazione, gestione degli errori e altro ancora
Scheduler
Fornisce Scheduler che consentono di specificare su quali thread eseguire operazioni reattive, consentendo il controllo sulla concorrenza e l'asincronia.
Gestione degli Errori
Project Reactor gestisce in modo elegante gli errori nelle sequenze reattive, consentendo di definire logiche di fallback e di retry.
Backpressure
PR supporta la gestione dello "backpressure", che consente ai consumatori di controllare la velocità di produzione di dati in modo da evitare l'overload.
Gestione delle Risorse
PR offre strumenti per gestire automaticamente le risorse, come la chiusura di connessioni o la liberazione di memoria, quando non sono più necessarie
Publisher e Subscriber
In Project Reactor gli Observable vengono chiamati Publisher e gli Observer invece Subscriber.
Flussi in Project Reactor
Nei flussi in Project Reactor possono passare tre tipi di segnali (Sink):
- Next: rappresenta un evento che trasporta un valore nel flusso di dati. È utilizzato per emettere nuovi valori all'interno del flusso.
- Complete: rappresenta un evento che indica la fine del flusso di dati. Quando unPublisher emette un Complete, significa che non ci saranno ulteriori eventi nel flusso
- Error: rappresenta un evento che indica un errore nel flusso di dati. Quando siverifica un errore, il flusso viene interrotto e può essere gestito da unSubscriber per la gestione degli errori.
Integrazioni di Project Reactor
Project Reactor si integra con una varietà di componenti e librerie nel mondo Java. Alcune delle integrazioni più comuni includono:
- Redis
- Webclient
Tutorial: come ottenere un flusso in Project Reactor
Per ottenere un flusso (Flux) o un mono (Mono) con Project Reactor, è possibile utilizzare diversi metodi factory forniti dalla libreria. Ecco alcuni esempi di come ottenere un flusso o un mono.
Metodo Flux/Mono.just:
Il metodo just crea un flusso o un mono da uno o più valori specificati direttamente come argomenti. Tipicamente si usa quando si hanno già i dati da inserire nel flusso o nel mono.
Flux<Integer> flux = Flux.just(1, 2, 3, 4);
Mono<String> mono = Mono.just("Hello, world!");
Metodo Mono.fromCallable
Il metodo fromCallable crea un mono da un'operazione asincrona rappresentata da un Callable. Questo è utile quando si ha un'operazione che potrebbe restituire un valore.
Mono<Integer> mono = Mono.fromCallable(() -> {
// Effettua un'operazione complessa
return 42;
});
Metodo Flux.fromIterable
Il metodo fromIterable crea un flusso da una collezione iterabile (ad esempio, una lista) o da una sequenza di elementi. È utile quando si vuole trasformare una collezione esistente in un flusso.
List<Integer> list = Arrays.asList(1, 2, 3, 4);
Flux<Integer> flux = Flux.fromIterable(list);
Metodo Flux.range
Il metodo Flux.range consente di generare una sequenza reattiva di valori interi all'interno di un intervallo specificato. Il metodo ha la seguente firma:
Flux<Integer> range(int start, int count)
Dove:
- start è il valore di partenza della sequenza.
- count è il numero di elementi nella sequenza.
Ecco un esempio di come utilizzare Flux.range:
// Creare una sequenza reattiva di numeri da 1 a 5
Flux<Integer> numbers = Flux.range(1, 5);
Come sottoscriversi ad uno stream in Project Reactor
La sottoscrizione è l'azione che avvia l'esecuzione del flusso e consente di gestire i dati che vengono emessi in modo asincrono.
Per potersi sottoscrivere ad uno stream in Project Reactor si utilizza il metodo subscribe.
Il metodo .subscribe() può essere utilizzato in varie forme con diverse firme per consentire al programmatore di specificare come gestire i dati emessi, gli errori o il completamento del flusso. Ecco alcune delle forme più comuni:
- .subscribe(Consumer<T> consumer): Specifica un consumatore (Consumer) che gestirà i valori emessi dal flusso;
- .subscribe(Consumer<T> consumer,Consumer<Throwable> errorConsumer): Specifica un consumatore per i valori e un consumatore per gli errori. Gli errori che si verificano durante l'esecuzione del flusso verranno gestiti dal consumatore degli errori;
- .subscribe(Consumer<T> consumer,Consumer<Throwable> errorConsumer, Runnable completeHandler): Questa forma aggiunge anche un gestore per il completamento del flusso. Il completeHandler verrà eseguito quando il flussotermina con successo.
Ecco un esempio di utilizzo di .subscribe():
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.subscribe(
value -> System.out.println("Valore emesso: " + value),
error -> System.err.println("Errore: " + error),
() -> System.out.println("Flussocompletato")
);
Operatori in Project Reactor
In questo capitolo, esploreremo alcuni degli operatori più comuni disponibili in Project Reactor. Questi operatori sono fondamentali per trasformare, filtrare e manipolare i flussi reattivi in vari modi.
Operatore filter
L'operatore filter permette di filtrare un flusso reattivo in base a una condizione specificata.Questo operatore emetterà solo gli elementi che soddisfano la condizione di filtro.
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evenNumbers = numbers.filter(value -> value % 2 == 0);
evenNumbers.subscribe(value -> {
System.out.println("Numero pari: " + value);
});
In questo esempio, filter viene utilizzato per creare un nuovo flusso evenNumbers che contiene solo i numeri pari dall'originale numbers.
Operatore map
L'operatore map permette di trasformare ogni elemento di un flusso reattivo in un nuovo elemento. È utile quando si desidera applicare una funzione di trasformazione a ciascun elemento del flusso.
Flux<Integer> numbers = Flux.range(1, 5);
Flux<String> squaredNumbers = numbers.map(value -> "Numero alquadrato: " + (value * value));
squaredNumbers.subscribe(value -> {
System.out.println(value);
});
In questo esempio, map è utilizzato per trasformare ogni numero nell'originale numbers in una stringa che rappresenta il numero al quadrato.
Operatore collectList
L'operatore collectList viene utilizzato per raccogliere tutti gli elementi di un flusso reattivo in una lista e emettere la lista risultante quando il flusso si completa. Permette di trasformare un Flux in un Mono.
Flux<Integer> numbers = Flux.range(1, 5);
Mono<List<Integer>> collectedNumbers = numbers.collectList();
collectedNumbers.subscribe(list -> {
System.out.println("Lista di numeri: " + list);
});
In questo esempio, collectList raccoglie tutti i numeri emessi dal flusso numbers in una lista ed emette questa lista una volta che il flusso è completo.
Operatore block
L'operatore block è utilizzato per sincronizzare l'esecuzione del codice e attendere che un flusso reattivo si completi. Questo è utile in contesti in cui è necessario ottenere il risultato finale in modo sincrono. L’operatore è applicabile ai Mono.
Mono<String> message = Mono.just("Hello,World!");
String result = message.block();
System.out.println("Messaggio:" + result);
Operatore Merge
L'operatore merge viene utilizzato per combinare più flussi reattivi in uno solo, senza garantire l'ordine di emissione degli elementi. È utile quando si desidera combinare diversi flussi in modo asincrono.
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> merged = Flux.merge(flux1, flux2);
merged.subscribe(value -> {
System.out.println("Valore:" + value);
});
In questo esempio, merge viene utilizzato per combinare flux1 e flux2 in un unico flusso merged, che emette tutti gli elementi in modo asincrono.
Scheduler in Project Reactor
Uno Scheduler in Project Reactor è un componente che definisce il thread o il pool di thread che verrà utilizzato per eseguire operazioni reattive in modo asincrono. L'utilizzo di Schedulers è fondamentale per gestire operazioni che richiedono concorrenza, parallelismo o esecuzione asincrona senza bloccare il thread principale.
Di seguito, elencheremo alcuni degli Schedulers più comuni disponibili in Project Reactor e le loro caratteristiche principali.
Schedulers.immediate()
Lo Scheduler Schedulers.immediate() esegue le operazioni nello stesso thread del chiamante, senza alcuna pianificazione o concorrenza.
Schedulers.parallel()
Lo Scheduler Schedulers.parallel() utilizza un pool di thread fisso. La dimensione del thread pool è uguale al numero di processori disponibili sul sistema. È adatto per task che richiedono un alto grado di parallelismo, come l'elaborazione di grandi quantità di dati in parallelo.
Schedulers.boundedElastic()
Lo Scheduler Schedulers.boundedElastic() utilizza un pool di thread elastico, ma limitato a un numero massimo di thread. È pensato per l'esecuzione di operazioni che coinvolgono operazioni di I/O, come richieste HTTP, operazioni di lettura/scrittura su file o connessioni di rete. La dimensione massima del thread pool è determinata dal numero di core moltiplicato per 10.
Schedulers.newBoundedElastic (Versione 3.4.0+)
Lo Scheduler Schedulers.newBoundedElastic è stato introdotto nella versione 3.4.0 di ProjectReactor. Permette di creare un thread pool elastico limitato, simile a Schedulers.boundedElastic().
A differenza dei due Schedulers precedenti che sono condivisi tra tutti nel sistema, quindi anche utilizzati dalle librerie che sfruttano Project Reactor, Schedulers.newBoundedElastic permette di definire un thread pool che sarà utilizzato solo dove vogliamo noi.
// Esempio di creazione di un Scheduler elastico limitato
Scheduler elasticScheduler = Schedulers.newBoundedElastic(
10, //Numero massimo di thread
100, // Dimensione massima della coda
"my-elastic-scheduler" // Nome del pool di thread
);
Parallelizzazione in Project Reactor
La parallelizzazione è una tecnica fondamentale per migliorare le prestazioni delle applicazioni quando è necessario eseguire operazioni intensive in modo concorrente.
Project Reactor offre strumenti potenti per la parallelizzazione dei flussi reattivi, che possono essere preferibili rispetto all'uso diretto di ThreadPoolExecutor o altre librerie di gestione dei thread.
Ecco alcune ragioni per preferire Project Reactor:
- Gestione automatica dei thread. Project Reactor si occupa automaticamente dell'allocazione e della gestione dei thread necessari per eseguire operazioni reattive in modo parallelo. Questo significa che non è necessario preoccuparsi di creare manualmente e gestire un pool di thread quando si desidera parallelizzare le operazioni. La libreria gestisce i thread in modo efficiente, riducendo al minimo l'overhead e ottimizzando l'utilizzo delle risorse del sistema.
- Facilità di utilizzo. Project Reactor semplifica notevolmente la parallelizzazione delle operazioni attraverso operatori come parallelFlux e flatMap. Questi operatori consentono di parallelizzare facilmente le operazioni su flussi reattivi esistenti, senza la complessità di gestire manualmente i thread o i task.
ParallelFlux
Il Parallel Flux è una variante di Flux che permette di suddividere uno stream di dati in sottostream, ognuno dei quali può essere elaborato su thread separati, consentendo l'elaborazione parallela dei dati.
Partendo da un Flux esistente e utilizzando il metodo parallel(int n) viene creato un ParallelFlux con il numero specificato di sottostream.
Ad esempio:
Flux<Integer> numbers = Flux.range(1, 10);
ParallelFlux<Integer> parallelNumbers = numbers.parallel(2);
In questo esempio, stiamo creando un ParallelFlux che suddivide il flusso in 2 sottostream. Gli eventi nel flux verrano suddivisi nei due sotto stream attraverso una strategia Round-robin.
Utilizzando il metodo .runOn(Schedulers) si specificherà su quali thread, tramite lo Scheduler, dove verranno eseguite le operazioni di ciascun sottostream. Ad esempio:
parallelNumbers.runOn(Schedulers.parallel())
È possibile applicare le operazioni desiderate su ciascun sottostream utilizzando operatori come map, filter, ecc. Le operazioni saranno eseguite in parallelo su thread separati. Ad esempio:
parallelNumbers.map(value -> {
// Elaborazione su ciascun elemento
return value * 2;
})
Alla fine, utilizzando il metodo .sequential() è possibile riottenere un Flux che combina i risultati dei sottostream. Ad esempio:
Flux<Integer> resultFlux = parallelNumbers
.map(value -> value * 2)
.sequential();
Come utilizzare ParallelFlux
Ecco un esempio completo di come utilizzare ParallelFlux per eseguire operazioni parallele su un flusso di numeri:
import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Schedulers;
public class ParallelFluxExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 10);
ParallelFlux<Integer> parallelNumbers = numbers.parallel(2);
Flux<Integer> resultFlux = parallelNumbers
.runOn(Schedulers.boundedElastic())
// Esegui le operazioni su thread separati
.map(value -> {
// Esempio di elaborazione su ciascun elemento
return value * 2;
})
.sequential(); // Ritorna a un Flux sequenziale
resultFlux.subscribe(result -> {
System.out.println("Risultato: " + result);
});
}
}
In questo esempio, stiamo suddividendo il flusso numbers in 2 sottostream, eseguendo le operazioni su thread separati con l'uso di Schedulers.boundedElastic(), e ritornando a un Flux sequenziale alla fine. Questo consente di elaborare gli elementi in parallelo e ottenere un miglioramento delle prestazioni quando necessario.
Da fare attenzione che l’ordine dello stream finale rispetto lo stream iniziale non è garantito. La parallelizzazione comporta l'esecuzione simultanea di operazioni su sottostream separati, il che significa che l'elaborazione di alcuni elementi potrebbe essere più veloce di altri. Di conseguenza, l'ordine di emissione nel flusso risultante potrebbe variare.
Operatori subscribeOn e publishOn
Gli operatori subscribeOn e publishOn sono strumenti potenti per controllare su quali thread vengono eseguiti gli operatori di una catena applicati a un flusso. Questi operatori sono utili per gestire la concorrenza e definire il contesto di esecuzione delle operazioni reattive.
L'operatore subscribeOn specifica su quale thread verrà eseguito l'operatore che produce lo stream. In altre parole, determina il contesto in cui l'intera catena di operatori inizia l'esecuzione.
List<Integer> list = Arrays.asList(1, 2, 3, 4);
Flux.fromIterable(list)
.filter(value -> value % 2 == 1)
.subscribeOn(Schedulers.boundedElastic())
.map(value -> value * 2)
.subscribe(value -> log.info(value));
In questo esempio, l'operatore subscribeOn specifica che la creazione del flusso da list e l'operatore filter verranno eseguiti su un thread del pool elastico. Gli operatori successivi, come map, erediteranno questo contesto di esecuzione.
Nota: Se subscribeOn viene utilizzato più volte nella stessa catena di operatori, solo la prima chiamata ha effetto e le successive vengono ignorate.
L'operatore publishOn specifica su quale thread verranno eseguiti gli operatori successivi. In altre parole, gli operatori che seguono publishOn verranno eseguiti su un thread diverso rispetto agli operatori che precedono publishOn.
List<Integer> list = Arrays.asList(1, 2, 3, 4);
Flux.fromIterable(list)
.filter(value -> value % 2 == 1)
.publishOn(Schedulers.boundedElastic())
.map(value -> value * 2)
.subscribe(value -> log.info(value));
In questo esempio, l'operatore publishOn specifica che l'operatore map e gli operatori successivi verranno eseguiti su un thread del pool elastico. Gli operatori precedenti, come filter, manterranno il loro contesto di esecuzione originale.
Nota: Ogni volta che è presente l'operatore publishOn nella catena, i successivi operatori verranno eseguiti su un thread diverso.
Se si utilizzano entrambi gli operatori subscribeOn e publishOn nella stessa catena di operatori, è importante considerare come influiranno sul contesto di esecuzione:
- L'operatore subscribeOn determina il thread di esecuzione del primo operatore nella catena non appena viene incontrato;
- L'operatore publishOn cambierà il thread di esecuzione delle operazioni successive, ma non influirà sul thread di esecuzione del primo operatore.
Operatore flatMap
L'operatore flatMap è uno degli operatori più potenti e flessibili in Project Reactor. È ampiamente utilizzato per la gestione della trasformazione di ogni elemento in uno stream separato, che può essere di tipo diverso rispetto all'input.
L'operatore flatMap è particolarmente utile quando si desidera applicare operazioni asincrone o non bloccanti a ciascun elemento di un flusso reattivo.
È importante comprendere la differenza fondamentale tra flatMap e l'operatore map:
- L'operatore map applica una trasformazione sincrona a ciascun elemento del flusso e restituisce un nuovo flusso con gli elementi trasformati. È ideale per trasformazioni semplici e sincrone;
- L'operatore flatMap applica una trasformazione asincrona a ciascun elemento del flusso e può restituire un flusso di elementi trasformati. È ideale per trasformazioni complesse o asincrone in cui ogni elemento può generare zero, uno o più elementi nel flusso risultante.
Nella figura vediamo come da ogni evento del flusso in input flatMap genera un sotto flusso dove gli eventi emessi vengono emessi nel flusso di output.
Definizioni dell'operatore flatMap
Vediamo due definizioni dell’operatore flatMap
Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flux<R> flatMap(Function<? super T,? extends Publisher<? extendsR>> mapper, int concurrency)
La prima definizione prende in input una funzione di mappatura mapper che trasforma ogni elemento T in un flusso di elementi R. La funzione mapper è asincrona e può restituire zero, uno o più elementi nel flusso risultante.
List<List<Integer>> list = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6),
Arrays.asList(7, 8, 9)
);
Flux.fromIterable(list)
.flatMap(value -> Flux.fromIterable(value))
.subscribe(p -> System.out.println("Valore: " + p));
In questo esempio, flatMap prende una lista di liste e "srotola" ciascuna lista in un flusso separato. Il risultato è un singolo flusso contenente tutti gli elementi delle liste.
La seconda definizione di flapMap aggiunge un parametro concurrency che limita il numero massimo di funzioni di mappatura che possono essere eseguite contemporaneamente. In altre parole, concurrency controlla quante operazioni di mappatura possono essere in corso in parallelo.
Flux.range(1, 10)
.flatMap(value -> Mono.just(value)
.subscribeOn(Schedulers.boundedElastic())
.map(v -> {
System.out.println("Elaborazione del valore:" + v + " su thread: " + Thread.currentThread().getName());
return v * 2;
}), 4) // Limite di concorrenza a 4
.subscribe(System.out::println);
In questo esempio, flatMap viene utilizzato con concorrenza limitata (4) per eseguire operazioni asincrone su elementi del flusso. Ogni elemento viene elaborato in modo asincrono su un thread diverso, e il risultato viene stampato quando è disponibile.
L'operatore flatMap è una potente aggiunta all'arsenale di operatori reattivi e offre un alto grado di flessibilità nella gestione di flussi di dati complessi o asincroni. Può essere utilizzato per eseguire operazioni concorrenti, chiamate di rete, operazioni I/O e molto altro ancora.
Conclusione
La parallelizzazione è un aspetto cruciale della programmazione moderna: Project Reactor offre ParallelFlux e l'operatore flatMap per affrontare sfide di parallelizzazione in modo pulito ed efficiente.
A differenza di altre soluzioni che richiedono una configurazione manuale dei thread, Project Reactor si occupa automaticamente dell'allocazione e della gestione dei thread necessari per l'esecuzione delle operazioni.