In microservices or any other event-based architecture, in some use cases, a service might require us to make changes to their own local database and also publish an event. That event is then consumed by other services. To have a consistent software system, it is mandatory that these two actions get performed atomically. Both actions need to complete successfully, or none of them need to complete. There should not be another way.
An elegant way to solve this is by using the Outbox Pattern. This works by using a database table (if your service uses a relational database), usually called the outbox table, to store the events. In this case, you are able to include the insert SQL statement of the event into the use case local transaction. Another runner can periodically check if the outbox table is not empty and process the events by publishing them into a message broker.
I have implemented a Java library that I called JQueue, which makes the implementation of this pattern easier. JQueue implements a FIFO data structure using a relational database table so currently, it works if your service’s database is relational.
JQueue has two modules, the pushing module and the runner module. To push an event (or task) into the queue, you can do this:
JTxQueue.queue(/*a JDBC Data Source or a JDBC Connection */)
.push(
"{\"type\": \"job_type1\", \"event\":{\"id\": \"an id\", \"value\": \"\" }}");
Note that, as a parameter, you have to pass the dataSource or the connection that is currently being used in your local transaction. In this case, if your transaction is committed, the push will be also committed. If anything fails, everything will be rolled back. The event or task that you push into the queue can be any text.
Then, to consume events or tasks from the queue you have to write something like this:
JQueueRunner.runner(/* a JDBC DataSource */)
.executeAll(new Job() {
@Override
public void run(String data) {
//do something with data
}
});
The code above will process all the entries in the queue, one by one, in a loop until it is empty. It will read the queue data and call an instance of the Job interface that you have to provide to do whatever you need with the data. Which might be to push a message into a message broker or just call any other external service API. You can use any scheduler library (like for instance Quartz) to schedule the JQueue runner with the required frequency to keep the queue empty. In the case that your Job instance throw an Exception, the task is pushed back into the queue and their processing delayed by five minutes.
The runner uses the “select for update skip locked” SQL statement, which is a kind of new feature that some relational databases have implemented to be used (among other things) to implement queues in relational tables. This is one of the reasons that currently makes JQueue work in PostgreSQL v9.5+ and MySQL 8.0+ (the support of Oracle and MS SQL in JQueue is coming soon, as they both support the skip locked feature).
Let’s now show some examples of how you can push events within your local transaction. Suppose that your service creates users and when that happens you need to publish the NewUserEvent. If your service is using plain JDBC, you can do something like this:
Connection conn = connection();
try {
conn.setAutoCommit(false);
//your business logic first
final PreparedStatement st = conn.prepareStatement(
"insert into user(id, user_name, pwd, email) values(108, 'user1','anyPassword','user1@dot.com')");
st.executeUpdate();
//then push an event
JTxQueue.queue(conn)
.push(new NewUserEvent(108, "user1", "user1@dot.com").toJson());
conn.commit();
} catch (SQLException | JQueueException e) {
try {
conn.rollback();
throw new RuntimeException(e);
} catch (SQLException e1) {
throw new RuntimeException(e1);
}
} finally {
try {
conn.setAutoCommit(true);
conn.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
If your service uses JPA/Hibernate, you can do something like this:
EntityManagerFactory emf =
Persistence.createEntityManagerFactory("...");
EntityManager em = emf.createEntityManager();
EntityTransaction tx = em.getTransaction();
try {
tx.begin();
//your business logic first
User u = new User("username1", "pwd1", "user@dot.com");
em.persist(u);
//Then push an event
Session session = em.unwrap(Session.class);
session.doWork(new Work() {
@Override
public void execute(Connection connection) throws SQLException {
JTxQueue.queue(connection)
.push(new NewUserEvent(u.id(), u.userName(), u.email()).toJson());
}
});
tx.commit();
} catch (Exception e) {
tx.rollback();
throw new RuntimeException(e);
} finally {
if (em != null && em.isOpen())
em.close();
if (emf != null)
emf.close();
}
And, if your service uses Spring, you can do this:
@RestController
@RequestMapping("/api/users")
public class UserController {
@Autowired
private UserRepository userRepository;
@Autowired
private DataSource dataSource;
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
@Transactional
public User create(@RequestBody User user) throws SQLException {
//your business logic first
User u = userRepository.save(user);
//then push an event
JTxQueue.queue(dataSource)
.push(new NewUserEvent(u.id(), u.getUserName(), u.email()).toJson());
return u;
}
}
In all the examples above, the transaction wraps your business logic plus the push into the queue.
JQueue was inspired by Yii2 Queue an excellent PHP library. Hope it helps to make the implementation of the Outbox Pattern easier and simple!