Saga Executors

Overview

A Saga Executor in StackSaga is a specialized component responsible for encapsulating and executing a single atomic transaction within a distributed saga workflow. It acts as the bridge between your business logic and the saga orchestration engine, ensuring that each step in a long-running transaction is performed reliably and consistently. Saga Executors are categorized as either command executors (handling both primary and compensating executions for operations that modify state) or query executors (handling read-only operations). By isolating each atomic transaction in its own executor, StackSaga enables precise control, retry, and compensation mechanisms, reducing the risk of data anomalies and ensuring eventual consistency across microservices.

Understanding Atomic Operation

An atomic transaction is a set of operations that must either all succeed or all fail as a single unit. Within microservices, such a unit is referred to as an atomic execution.

stacksaga diagram atomic transaction

For instance, in the placing-order scenario, there are 4 atomic operations. Each atomic execution can itself consist of multiple service-level operations. it is totally acceptable to have multiple service-level operations inside a single atomic execution, as long as they are part of the same logical unit of work.

Fetching user details is quite different from the other atomic executions. because it is a read-only operation that doesn’t change any state in the user-service’s database. So it can be considered as an atomic operation, but it is not a atomic transaction. but others are atomic transactions. because they change the state in their respective service’s databases.

Understanding the atomic execution is crucial for implementing the executors correctly in StackSaga. because there is no rollback mechanism after committing each atomic executions onm their service’s databases.

Typically, an atomic execution consists of two phases: primary execution and compensating execution.

  1. Primary Execution (Main Execution)

    • The primary execution represents the core operation within a long-running transaction. Each primary execution is a distinct step in the overall business workflow, advancing the process toward completion. These operations are generally idempotent and isolated, ensuring they can be executed independently without unintended side effects.

      For example, in a place-order workflow, primary executions might include fetching user details, initializing the order, performing pre-authorization, updating stock, and processing the final payment.

    • These primary executions move the process forward. []

  2. Compensating Execution (Revert Execution)

    • A compensating execution is designed to undo the effects of a previously completed primary execution if a subsequent step fails. Each primary execution has a corresponding compensating action to reverse its impact, ensuring the system can return to a consistent state in the event of a failure.

    • Compensating executions move the process backward. []

Executor types

Based on the above classification, the saga executors are two types mainly. but there is a special type of executor called sub executor in Stacksaga other than the command executor and query executor to handle some extra compensating executions.

The below diagram shows the Executors types and what are the methods they have.

Stacksaga Executors types

Command executors

If some atomic execution has both primary execution and compensating, those kinds of atomic transactions should be implemented inside the command executor.
In Command executor has two methods for making the primary execution and making the compensating execution.

Examples Executions for command-executor:

  • Initialize order

  • Reserve the items

  • Make the payment

Code Example of Command Executor

@SagaExecutor(executeFor = "order-service", value = "initializeOrderExecutor") (1)
@AllArgsConstructor
public class InitializeOrderExecutor implements CommandExecutor<PlaceOrderAggregator> { (2)

    private final OrderService orderService;

    @Override (3)
    public ProcessStepManager<PlaceOrderAggregator> doProcess(
            PlaceOrderAggregator currentAggregator,
            ProcessStepManagerUtil<PlaceOrderAggregator> stepManager,
            String idempotencyKey
    ) throws RetryableExecutorException, NonRetryableExecutorException {

        try {
            (4)
            String orderId = this.orderService.initializeOrder(
                    currentAggregator.getUsername(),
                    currentAggregator.getProductItems(),
                    currentAggregator.getTotalAmount()
            );
            currentAggregator.setOrderId(orderId);
            return stepManager.next(RserveItemsExecutor.class, () -> "ORDER_INITIALIZED");(5)
        } catch (FeignException.ServiceUnavailable unavailableException) {
            (6)
            throw RetryableExecutorException
                    .buildWith(unavailableException)
                    .build();
        } catch (FeignException.BadRequest badRequestException) {
            (7)
            throw NonRetryableExecutorException
                    .buildWith(badRequestException)
                    .put("time", LocalDateTime.now())
                    .put("reason", "BadRequest")
                    .build();
        }
    }

    @Override
    @RevertBefore(startFrom = OrderInitializeSubBeforeExecutor.class) (10)
    @RevertAfter(startFrom = OrderInitializeSubAfterExecutor.class) (11)
    public SagaExecutionEventName doRevert(
            NonRetryableExecutorException processException,
            PlaceOrderAggregator finalAggregatorState,
            RevertHintStore revertHintStore,
            String idempotencyKey
    ) throws RetryableExecutorException {
        try {
            (8)
            this.orderService.cancelOrder(finalAggregatorState.getOrderId());
            return () -> "ORDER_CANCELLED";
        } catch (FeignException.ServiceUnavailable unavailableException) {
            (9)
            throw RetryableExecutorException
                    .buildWith(unavailableException)
                    .build();
        } catch (FeignException.BadRequest badRequestException) {
            (9)
            revertHintStore.put("InitializeOrderExecutor:FAILED", processException.getMessage());
        }
    }
}
1 The executor is annotated with @SagaExecutor to make it as a spring bean and also to provide the necessary metadata such as the target service that the executor is executed for and the unique name of the executor.
2 The executor implements the CommandExecutor interface to make it as a command executor.
3 The doProcess method is overridden to implement the primary execution logic.
it passes the current state of the aggregator,
a utility for managing the process steps,
and an idempotency key to handle duplicate requests. (the idempotency key is identical for each retrying of the same executor invocation of the same transaction).
4 The primary execution logic is implemented inside the doProcess method. here you can update the aggregator as needed and access data from the aggregator. it contains all the changes that were made by the previous executors so far.
5 if the execution is done successfully, The next executor is navigated by using the stepManager.next method.
it should be provided the next executor class that you want to navigate and also the event name supplier that will be used to store the event name in the event-store regarding the successful execution of the current executor.
There is another method called stepManager.complete that can be used to complete the entire transaction successfully after executing the current executor.
stepManager.complete(() -> "ORDER_INITIALIZED");
6 Catch the resource unavailability that can be retried to notify to the SEC to keep the transaction in retry mode and it will be exposed to the next retrying schedule as you configured.
you can wrap the original exception by using the RetryableExecutorException.buildWith(Exception e) method to create a new instance of RetryableExecutorException by wrapping the original exception.
if you throw your own exception without wrapping to the RetryableExecutorException, the SEC will consider it as a non-retryable exception and it will stop the transaction forward and start the compensating executions in reverse order immediately.
7 Catch the non-retryable exceptions to notify to the SEC to stop the transaction forward and start the compensating executions in reverse order immediately.
you can wrap the original exception by using the NonRetryableExecutorException.buildWith(Exception e) method to create a new instance of NonRetryableExecutorException by wrapping the original exception, and also you can add any metadata that you want to store in the event-store regarding the exception by using the put(String key, Object value) method.
it can be accessed later in the next compensating execution by using the RevertHintStore that is passed to the doRevert method of the command executor.
if you throw your own exception without wrapping to the NonRetryableExecutorException, it doesn’t matter in this case, because the SEC will consider it as a non-retryable exception internally, and it will stop the transaction forward and start the compensating executions in reverse order immediately.
8 The doRevert method is overridden to implement the compensating execution logic.
it passes the exception that caused the compensation,
the final state of the aggregator before compensation,
the RevertHintStore for storing any metadata needed for compensation.
and an idempotency key to handle duplicate requests. (the idempotency key is identical for each retrying of the same executor invocation of the same transaction).
9 Catch the resource unavailability exception that can be retried to notify to the SEC to keep the transaction in retry mode and it will be exposed to the next retrying schedule as you configured.
you can wrap the original exception by using the RetryableExecutorException.buildWith(Exception e) method to create a new instance of RetryableExecutorException by wrapping the original exception.
if you throw your own exception without wrapping to the RetryableExecutorException, the SEC will consider it as a non-retryable exception and terminate the transaction. because RetryableExecutorException are not allowed in the compensating execution. in case it is thrown an unwrapped runtime exception, the SEC will terminate the transaction immediately.
If three is any possibility get a non-retryable exception in the compensating execution and also the compensating execution can be ignored, you can to catch that exception and store the metadata regarding that exception into the RevertHintStore by using the put(String key, Object value) method like above example. it can avoid the transaction termination.
10 The @RevertBefore annotation is used to specify starting sub-before-executor that should be started before the main compensating execution of this command executor. it is optional to use. if there is no need to execute any sub-before-executors, you can avoid that annotation.
11 The @RevertAfter annotation is used to specify starting sub-after-executor that should be started after the main compensating execution of this command executor. it is optional to use. if there is no need to execute any sub-after-executors, you can avoid that annotation.

Code Example of Reactive Command Executor

Only highlighted parts are described from the above example.
@SagaExecutor(executeFor = "order-service", value = "initializeOrderExecutor")
@AllArgsConstructor
public class ReactiveInitializeOrderExecutor implements ReactiveCommandExecutor<PlaceOrderAggregator> { (1)

    @Override (2)
    @NonNull
    public Mono<ProcessStepManager<PlaceOrderAggregator>> doProcess(
            PlaceOrderAggregator currentAggregator,
            ProcessStepManagerUtil<PlaceOrderAggregator> stepManager,
            String idempotencyKey
    ) {
        (3)
        return this.orderService
                .createOrder(currentAggregator.getUserData(), currentAggregator.getOrderDetails())
                .map(orderId -> {
                    currentAggregator.setOrderId(orderId);
                    return stepManager.next(ReactiveRserveItemsExecutor.class, () -> "ORDER_INITIALIZED");
                })
                .onErrorResume(throwable -> {
                    if (throwable instanceof ResourceUnavailableException) {
                        return Mono.error(RetryableExecutorException.buildWith(throwable).build());
                    } else {
                        return Mono.error(NonRetryableExecutorException.buildWith(throwable)
                                .put("time", String.valueOf(System.currentTimeMillis()))
                                .put("reason", "BadRequest")
                                .build());
                    }
                });
    }


    @Override (4)
    @NonNull
    public Mono<SagaExecutionEventName> doRevert(NonRetryableExecutorException processException, PlaceOrderAggregator finalAggregatorState, RevertHintStore revertHintStore, String idempotencyKey) {
        (5)
        return this.orderService
                .cancelOrder(finalAggregatorState.getOrderId())
                .thenReturn((SagaExecutionEventName) () -> "ORDER_INITIALIZATION_REVERTED")
                .onErrorResume(throwable -> {
                    if (throwable instanceof ResourceUnavailableException) {
                        return Mono.error(RetryableExecutorException.buildWith(throwable).build());
                    } else {
                        revertHintStore.put("InitializeOrderExecutor:FAILED", processException.getMessage());
                        SagaExecutionEventName ignored = () -> "ignored";
                        return Mono.just(ignored);
                    }
                });
    }
}
1 The executor implements the ReactiveCommandExecutor<PlaceOrderAggregator> interface to make it as a reactive command executor. the generic type should be the aggregator class that you want to use in the executor.
2 The doProcess method is overridden to implement the primary execution logic in a reactive way.
3 The primary execution logic is implemented inside the doProcess method by using reactive pipeline.
the exception can not be passed outside of the pipeline, such as in imperative way. and also the method can not be return null or Mono.empty(). it is caused by primary execution exception and stopping the transaction forward and starting the compensating executions in reverse order immediately.
4 The doRevert method is overridden to implement the compensating execution logic in a reactive way.
5 The compensating execution logic is implemented inside the doRevert method by using reactive pipeline.
The same policy of the primary execution is applied here as well.
It can not have a non-retryable exception due to compensating execution. if there is any possibility to have a non-retryable exception, it should be caught and stored in the RevertHintStore and return a dummy event name like above example to avoid the transaction termination.

Query executors

If the atomic execution has only the primary execution, those kinds of executions should be implemented inside the Query executor.

In Query Executor has only one method for making the primary execution.

Example Executions for query-executor:

  • collecting user delivery details
    because it doesn’t make any change to the user-service’s database. it’s a read-only operation.

Code Example of Query Executor

@SagaExecutor(executeFor = "user-service", value = "chekUserDetailsExecutor") (1)
@AllArgsConstructor
public class ChekUserDetailsExecutor implements QueryExecutor<PlaceOrderAggregator> { (2)

    private final UserService userService;

    @Override (3)
    public ProcessStepManager<PlaceOrderAggregator> doProcess(
            PlaceOrderAggregator currentAggregator,
            ProcessStepManagerUtil<PlaceOrderAggregator> stepManager,
            String idempotencyKey
    ) throws RetryableExecutorException, NonRetryableExecutorException {

        try {
            (4)
            UserDetailDto userDetail = this.userService.getUserDetails(currentAggregator.getUsername());
            currentAggregator.setUserDetail(userDetail);

            return stepManager.next(InitializeOrderExecutor.class, () -> "FETCHED_USER_DETAILS"); (5)
        } catch (FeignException.ServiceUnavailable unavailableException) {
            (6)
            throw RetryableExecutorException
                    .buildWith(unavailableException)
                    .build();
        } catch (FeignException.BadRequest badRequestException) {
            (7)
            throw NonRetryableExecutorException
                    .buildWith(badRequestException)
                    .put("time", LocalDateTime.now())
                    .put("reason", "BadRequest")
                    .build();
        }
    }
}
1 The executor is annotated with @SagaExecutor to make it as a spring bean and also to provide the necessary metadata such as the target service that the executor is executed for and the unique name of the executor.
2 The executor implements the QueryExecutor interface to make it as a query executor.
3 The doProcess method is overridden to implement the primary execution logic.
it passes the current state of the aggregator,
a utility for managing the process steps,
and an idempotency key to handle duplicate requests. (the idempotency key is identical for each retrying of the same executor invocation of the same transaction).
4 The primary execution logic is implemented inside the doProcess method. here you can update the aggregator as needed and access data from the aggregator. it contains all the changes that were made by the previous executors so far.
5 if the execution is done successfully, The next executor is navigated by using the stepManager.next method.
it should be provided the next executor class that you want to navigate and also the event name supplier that will be used to store the event name in the event-store regarding the successful execution of the current executor.
There is another method called stepManager.complete that can be used to complete the entire transaction successfully after executing the current executor.
stepManager.complete(() -> "ORDER_INITIALIZED");
6 Catch the resource unavailability that can be retried to notify the SEC to keep the transaction in retry mode and it will be exposed to the next retrying schedule as you configured.
you can wrap the original exception by using the RetryableExecutorException.buildWith(Exception e) method to create a new instance of RetryableExecutorException by wrapping the original exception.
if you throw your own exception without wrapping to the RetryableExecutorException, the SEC will consider it as a non-retryable exception and it will stop the transaction forward and start the compensating executions in reverse order immediately.
7 Catch the non-retryable exceptions to notify to the SEC to stop the transaction forward and start the compensating executions in reverse order immediately.
you can wrap the original exception by using the NonRetryableExecutorException.buildWith(Exception e) method to create a new instance of NonRetryableExecutorException by wrapping the original exception, and also you can add any metadata that you want to store in the event-store regarding the exception by using the put(String key, Object value) method.
it can be accessed later in the next compensating execution by using the RevertHintStore that is passed to the doRevert method of the command executor.
if you throw your own exception without wrapping to the NonRetryableExecutorException, it doesn’t matter in this case, because the SEC will consider it as a non-retryable exception internally, and it will stop the transaction forward and start the compensating executions in reverse order immediately.

Code Example of Reactive Query Executor

@SagaExecutor(executeFor = "user-service", value = "reactiveChekUserDetailsExecutor")
@AllArgsConstructor
public class ReactiveChekUserDetailsExecutor implements ReactiveQueryExecutor<PlaceOrderAggregator> { (1)

    private final UserService userService;

    @Override (2)
    public Mono<ProcessStepManager<PlaceOrderAggregator>> doProcess(
            PlaceOrderAggregator currentAggregator,
            ProcessStepManagerUtil<PlaceOrderAggregator> stepManager,
            String idempotencyKey
    ) {
        (3)
        return this
                .userService
                .getUserDetails(currentAggregator.getUsername())
                .map(userData -> {
                    currentAggregator.setTel(userData.getPhoneNumber());
                    currentAggregator.setEmail(userData.getEmail());
                    currentAggregator.setAddress(userData.getEmail());
                    return stepManager.next(ReactiveInitializeOrderExecutor.class, () -> "FETCHED_USER_DETAILS");
                })
                .onErrorResume(throwable -> {
                    if (throwable instanceof ResourceUnavailableException) {
                        return Mono.error(RetryableExecutorException.buildWith(throwable).build());
                    } else {
                        return Mono.error(NonRetryableExecutorException.buildWith(throwable)
                                .put("time", String.valueOf(System.currentTimeMillis()))
                                .put("reason", "BadRequest")
                                .build());
                    }
                });
    }
}
1 The executor implements the ReactiveQueryExecutor<PlaceOrderAggregator> interface to make it as a reactive query executor. the generic type should be the aggregator class that you want to use in the executor.
2 The doProcess method is overridden to implement the primary execution logic in a reactive way.
3 The primary execution logic is implemented inside the doProcess method by using reactive pipeline.
the exception can not be passed outside of the pipeline, such as in imperative way. and also the method can not be return null or Mono.empty(). it is caused by primary execution exception and stopping the transaction forward and starting the compensating executions in reverse order immediately.

Sub Executors

There is a special type of executor called sub executor in Stacksaga other than the query executor and command executor. It’s used for executing the extra compensating atomic transactions in addition to the main compensating transactions.

You already know that you can only execute one atomic transaction inside the executor. The rule is applied for both primary execution and compensating execution. Sometimes You might want to execute another extra execution when one of compensating executions is executed.

For instance, just imagine that the system has a requirement that should be updated to another service when that the order is cancelled execution. Then, as per the executor’s rule, you cannot implement both executions in the doRevert method for canceling the order and notifying that into another server. Because those are totally two atomic operations. In this kind of situation, you can use a sub executor to overcome the challenge. Based on the position that the sub execution should be executed, the sub executors are divided into two types.

  1. sub-before-executors

    • If the sub executor should be run before making the main compensating transaction, it can be used sub before Executors. As per the requirement, it can be added any number of sub-before-executors into a command executor. You can navigate the SEC to each of them one by one. See the code implementation.

  2. sub-after-executors

    • If the sub executor should be run after making the main compensating transaction, it can be used a sub after Executor. As per the requirement, it can be added any number of sub-after-executors into a command executor. You can navigate the SEC to each of them one by one. See the code implementation.

If it’s needed to have both sub-before-executors and also sub after Executors, it is possible to do. If it has been configured both before and after executors, the order of the entire compensating transaction is like below.

At 1st all sub before Executors will be executed that you configured into the command executor and after completing the sub before Executors, next it is executed the default compensating execution (main compensating) of the command executor. After completing the main compensating, next it’s executed the sub after Executors that you have configured into the command executor. The diagram shows the order and relationship between the sub-before-executor, and main-revert-execution and sub-after-executor.

Stacksaga Executors

Code Example of Sub-Before-Executor

@SagaExecutor(executeFor = "order-service", value = "orderInitializeSubBeforeExecutor") (1)
public class OrderInitializeSubBeforeExecutor implements RevertBeforeExecutor<PlaceOrderAggregator, InitializeOrderExecutor> { (2)

    @Override (3)
    public RevertBeforeStepManager<PlaceOrderAggregator, InitializeOrderExecutor> doProcess(
            PlaceOrderAggregator finalAggregatorState,
            NonRetryableExecutorException nonRetryableExecutorException,
            RevertHintStore revertHintStore,
            RevertBeforeStepManagerUtil<PlaceOrderAggregator, InitializeOrderExecutor> stepManager,
            String idempotencyKey)
    throws RetryableExecutorException {
        //implement the sub-before-execution logic as the same way as the command executor's doRevert method.
        return stepManager.complete(() -> "ORDER_INITIALIZATION_SUB_BEFORE_REVERTED"); (4)
    }
}
1 The executor is annotated with @SagaExecutor to make it as a spring bean and also to provide the necessary metadata such as the target service that the executor is executed for and the unique name of the executor.
2 The executor implements the RevertBeforeExecutor<A, C> interface to make it as a sub-before-executor.
it should provide the aggregator class that is used in the entire transaction as the first generic parameter and the command executor class that should be executed before its main compensating execution as the second generic parameter.
3 The doProcess method is overridden to implement the sub-before-execution logic as the same way as the command executor’s doRevert method.
4 if the execution is done successfully, The next sub-before-executor is navigated to the main(parent) doRevert method by using the stepManager.complete method.
it should be provided the event name supplier that will be used to store the event name in the event-store regarding the successful execution of the current sub-before-executor.
If you have more than one sub-before-executor, you can navigate to the next sub-before-executor by using the stepManager.next method.

Code Example of Reactive Sub-Before-Executor

@SagaExecutor(executeFor = "order-service", value = "reactiveRevertBeforeExecutor") (1)
@RequiredArgsConstructor
public class ReactiveOrderInitializeSubBeforeExecutor implements ReactiveRevertBeforeExecutor<PlaceOrderAggregator, ReactiveInitializeOrderExecutor> { (2)

    private final ReactiveOrderService reactiveOrderService;

    @Override (3)
    @NonNull
    public Mono<RevertBeforeStepManager<PlaceOrderAggregator, ReactiveInitializeOrderExecutor>> doProcess(
            PlaceOrderAggregator aggregator,
            NonRetryableExecutorException processException,
            RevertHintStore revertHintStore,
            RevertBeforeStepManagerUtil<PlaceOrderAggregator, ReactiveInitializeOrderExecutor> stepManager,
            String idempotencyKey
    ) {
        (4)
        return this.reactiveOrderService
                .doSomething()
                .thenReturn(stepManager.complete(() -> "ORDER_INITIALIZATION_SUB_BEFORE_REVERTED"))
                .onErrorResume(throwable -> {
                    if (throwable instanceof ResourceUnavailableException) {
                        return Mono.error(RetryableExecutorException.buildWith(throwable).build());
                    } else {
                        revertHintStore.put("ReactiveOrderInitializeSubBeforeExecutor:FAILED", processException.getMessage());
                        SagaExecutionEventName ignored = () -> "ignored";
                        return Mono.just(stepManager.complete(ignored));
                    }
                });
    }
}
1 The executor is annotated with @SagaExecutor to make it as a spring bean and also to provide the necessary metadata such as the target service that the executor is executed for and the unique name of the executor.
2 The executor implements the ReactiveRevertBeforeExecutor<A, C> interface to make it as a reactive sub-before-executor. Generic A is the aggregator class that is used in the entire transaction and generic C is the command executor class that should be executed before its main compensating execution.
3 The doProcess method is overridden to implement the sub-before-execution logic in a reactive way.
4 The sub-before-execution logic is implemented inside the doProcess method by using reactive pipeline.
the exception can not be passed outside of the pipeline, such as in imperative way. and also the method can not be return null or Mono.empty(). it leads to non-retryable exception with a termination. because it is considered as a non-retryable error.
If there is any possibility to have a non-retryable exception, it should be caught and stored in the RevertHintStore and return a dummy event name like above example to avoid the transaction termination.

If the process is done successfully, and there is no more sub-before-executor for the respective command executor, it can be navigated to the main (parent) doRevert method by using the stepManager.complete method. it should be provided the event name supplier that will be used to store the event name in the event-store regarding the successful execution of the current sub-before-executor.
If you have more than one sub-before-executor, you can navigate to the next sub-before-executor by using the stepManager.next.
you can pass another sub-before-executor class that you want to navigate and also the event name that will be used to store the event name in the event-store regarding the successful execution of the current sub-before-executor.

Code Example of Sub-After-Executor

@SagaExecutor(executeFor = "order-service", value = "orderInitializeSubAfterExecutor") (1)
public class OrderInitializeSubAfterExecutor implements RevertAfterExecutor<PlaceOrderAggregator, InitializeOrderExecutor> { (2)

    @Override (3)
    public RevertAfterStepManager<PlaceOrderAggregator, InitializeOrderExecutor> doProcess(
            PlaceOrderAggregator finalAggregatorState,
            NonRetryableExecutorException processException,
            RevertHintStore revertHintStore,
            RevertAfterStepManagerUtil<PlaceOrderAggregator, InitializeOrderExecutor> stepManager,
            String idempotencyKey
    ) throws RetryableExecutorException {
        return stepManager.complete(() -> "ORDER_INITIALIZATION_SUB_AFTER_REVERTED"); (4)
    }
}
1 The executor is annotated with @SagaExecutor to make it as a spring bean and also to provide the necessary metadata such as the target service that the executor is executed for and the unique name of the executor.
2 The executor implements the RevertAfterExecutor<A, C> interface to make it as a sub-after-executor.
it should provide the aggregator class that is used in the entire transaction as the first generic parameter and the command executor class that should be executed after its main compensating execution as the second generic parameter.
3 The doProcess method is overridden to implement the sub-after-execution logic as the same way as the command executor’s doRevert method.
4 if the execution is done successfully, The next sub-after-executor is navigated by using the stepManager.complete method.
it should be provided the event name supplier that will be used to store the event name in the event-store regarding the successful execution of the current sub-after-executor.
If you have more than one sub-after-executor, you can navigate to the next sub-after-executor by using the stepManager.next method.

Code Example of Reactive-Sub-After-Executor

@SagaExecutor(executeFor = "order-service", value = "reactiveOrderInitializeSubAfterExecutor") (1)
public class ReactiveOrderInitializeSubAfterExecutor implements ReactiveRevertAfterExecutor<PlaceOrderAggregator, ReactiveInitializeOrderExecutor> { (2)

    private final ReactiveOrderService reactiveOrderService;

    @Override
    @NonNull (3)
    public Mono<RevertAfterStepManager<PlaceOrderAggregator, ReactiveInitializeOrderExecutor>> doProcess(
            PlaceOrderAggregator finalAggregatorState,
            NonRetryableExecutorException processException,
            RevertHintStore revertHintStore,
            RevertAfterStepManagerUtil<PlaceOrderAggregator, ReactiveInitializeOrderExecutor> stepManager,
            String idempotencyKey
    ) {
        (4)
        return this.reactiveOrderService
                .doSomething()
                .thenReturn(stepManager.complete(() -> "ORDER_INITIALIZATION_SUB_AFTER_EXECUTED"))
                .onErrorResume(throwable -> {
                    if (throwable instanceof ResourceUnavailableException) {
                        return Mono.error(RetryableExecutorException.buildWith(throwable).build());
                    } else {
                        revertHintStore.put("ReactiveOrderInitializeSubAfterExecutor:FAILED", processException.getMessage());
                        SagaExecutionEventName ignored = () -> "ignored";
                        return Mono.just(stepManager.complete(ignored));
                    }
                });
    }
}
1 The executor is annotated with @SagaExecutor to make it as a spring bean and also to provide the necessary metadata such as the target service that the executor is executed for and the unique name of the executor.
2 The executor implements the ReactiveRevertAfterExecutor<A, C> interface to make it as a reactive sub-before-executor. Generic A is the aggregator class that is used in the entire transaction and generic C is the command executor class that should be executed before its main compensating execution.
3 The doProcess method is overridden to implement the sub-after-execution logic in a reactive way.
4 The sub-after-execution logic is implemented inside the doProcess method by using reactive pipeline.
the exception can not be passed outside of the pipeline, such as in imperative way. and also the method can not be return null or Mono.empty(). it leads to non-retryable exception with a termination. because it is considered as a non-retryable error.
If there is any possibility to have a non-retryable exception, it should be caught and stored in the RevertHintStore and return a dummy event name like above example to avoid the transaction termination.

If the process is done successfully, and there is no more sub-before-executor for the respective command executor, it can be navigated to the main (parent) doRevert method by using the stepManager.complete method. it should be provided the event name supplier that will be used to store the event name in the event-store regarding the successful execution of the current sub-before-executor.
If you have more than one sub-before-executor, you can navigate to the next sub-before-executor by using the stepManager.next.
you can pass another sub-before-executor class that you want to navigate and also the event name that will be used to store the event name in the event-store regarding the successful execution of the current sub-before-executor.

Summary

Retryable Executor Exceptions are allowed for the following executors.

Executor DoProcess() Method doRevert() Method

Query Executor

Command Executor

Revert Before Executor

Revert After Executor

Non-Retryable Executor Exceptions are allowed for the following executors.

Executor DoProcess() Method doRevert() Method

Query Executor

Command Executor

Revert Before Executor

Revert After Executor

Guidelines for Creating Executors

Each saga executor should encapsulate a single atomic transaction. This means you must not implement multiple atomic transactions within the same executor.

The primary reason for this restriction is that the executor acts as a retryable unit managed by the Saga Orchestration Engine (SEC). If an executor contains multiple atomic transactions and a failure occurs, the SEC cannot determine which specific transaction failed. For example, if an executor performs three atomic transactions and the third one fails, retrying the executor will re-execute the first and second transactions, potentially leading to duplicate operations if those steps are not idempotent.

This approach can result in data anomalies such as redundancy, integrity violations, and consistency issues. Additionally, if a compensating (rollback) action is required, the SEC lacks the granularity to identify which atomic transaction needs to be reverted, since it treats the executor as a single atomic unit.

Executions Classifying Tips

When you are creating the executors, you have to decide that whether the executor is a command-executor or query-executor. To determine that the following chart will be helpful (from the database prospective).

The summary of the chart is that if the atomic execution is a read-only one, it should be implemented in a query-executor, and if the atomic operation does some state change on any database, that atomic operation should be implemented in a command-executor.

Operation Has a Revert Executor Type

C - Create

YES

Command-Executor

R - Read

NO

Query-Executor

U - Update

YES

Command-Executor

D - delete

YES

Command-Executor

For instance, let’s classify the executions that we have in our placing-order example.

Execution Executor Type Reason

Collecting user’s delivery details

Query-Executor

Fetching data doesn’t make an impact on the user-service’s database.

Initialize the order

Command-Executor

The order should be canceled if any upcoming atomic transaction is failed after initializing the order.

Making Pre-Auth

Command-Executor

The Pre-Auth should be canceled if any upcoming atomic transaction is failed after making the Pre-Auth.

Updating The Stock

Command-Executor

The Stock should be restored if any upcoming atomic transaction is failed after reducing the stock.

Making Real Payment

Command-Executor

The Payment should be refunded if any upcoming atomic transaction is failed after making the payment.

In the placing-order example, there is no any atomic operation after making the payment. But as the theory, making payment execution should be executed withing a command-executor. because, for instance, if a new another atomic process is added in the future after making the payment, you must implement the compensating execution for making the payment.

Combine multiple atomic executions

There are two possibilities to implement multiple atomic operations in the saga executor. You know already there are two types of atomic executions in Stacksaga called command executions and query executions. The query executions can sometimes be used together in the same executor based on the use case.

Using multiple read-only atomic operations inside the same executor can reduce the event sourcing overhead. Because you know that after each executor, the new state of the aggregator is stored as a new event in the database by the Saga engine. For instance, if you implement 3 read-only atomic transactions in the same executor, you can reduce the event sourcing overhead by 2. Because if we added those 3 executions to the 3 executors, 3 times the event store is updated after each execution.

First way:

stacksaga diagram combine multiple executor option 1.drawio

Second way:

stacksaga diagram combine multiple executor option 2.drawio