In the section Java Collection Framework, we might know about the Queue in Java which maintains the first-in-first-out order. It can be defined as an ordered list that is used to hold the elements which are about to be processed.
Let's take an example like this: Assume that you haven an application which provide a functionality for sending emails. However, calling this api for sending an email and it will take some time (Ex: about 6 seconds) and if there are many users call this api for sending email, then afterward users have to wait for a long time and maybe they will receive the timeout error. The Queue will help us keep these messages and will handle it one by one later with first-in-first-out order. So after the use send the request message then the application add this message into the Queue and will handle it later, then the user don't have to wait anymore.
However, with the default Queue that Java provided, all the messages in that Queue will be stored on memory and if the application is stopped or restarted, then all these messages will be lost. Moreover, if there are too many messages in the Queue then the application may need a lot of RAM. Because Java can only hold so much information before the heap becomes a limiting factor with high-impacting garbage collector as a result.
So, Chronicle Queue will help us to solve these problems, the Chronicle Queue persists every single message using a memory-mapped file. This allows us to share messages between processes. It stores data directly to off-heap memory, therefore, making it free of GC overhead. It is designed for providing low-latency message framework for high-performance applications.
So before going to the example let's take a look about the example that we are going to do. Assume that have a service for sending emails when our micro service system got errors and sending an email is taking time, about 6 seconds. Because there are many services into our system so there are many errors can happen at the same time. So we have to build a service which have to
Avoid overloading and request timeout when there are many requests at the same time.
Avoid lost messages when the service is stopped or restarted.
Have the ability to continue handing on the last handled message when the service is stopped or restarted.
So below is the diagram that we can achieve those things by using the support of Chronicle Queue which will help us to store queue messages on disk.
So, as you can see in the diagram, we assume that the user use postman to send messages to our Spring Boot service, then these message will be put into the Chronicle Queue which is stored in a file on disk.
Then there is a method that will be executed every second (base on our configuration) to check the message queue. However, before checking the message in the queue it will get the last index in an other Chronicle queue which is used for storing handled index message. So with this index, the method will continue to get the upcoming message in the queue and handle it although the service is stop and restarted.
Then after handled the message in the queue, the method will put the previous index to the handled index queue to keep the progress can be continue if the service is restarted.
As you can see, we will configure 2 beans for 2 queue, the first one is the errorDetailQueue and the second one is the errorDetailQueueIndex.
Firstly, we will config the basePath which is the location that the file which contain queue messages is stored. In this example, we will chose the location for storing the Chronicle Queue file in our current project.
Secondly, we will use ChronicleQueueBuilder to build the Chronicle instance. Then now, for using the Chronicle Queue, we just need to @Autowired it.
Because we are going to create a method which will be executed every second to check and handle message in queue, so we need to enable EnableScheduling in our Spring Boot Service. So let's create an configuration class as below.
Then let's create a service class ChronicleQueueService which will provide some method for using Chronicle Queue.
addToQueue: This method is used for pushing a message into the queue.
readAllFromQueue: This method is used for getting all the message from the queue.
handleNextItemFromQueue: This method is used for read a message in queue and update the index of this message into the index queue which mean we don't have to read message from the beginning when the service is restarted because we saved the last index of read message.
sendNotifiedEmail: This method is used for simulating reading messages in queue and handle them one by one until all messages are handled, it will use the method getNextItemFromQueue inside.
Let's create the ChronicleQueueService.java as below.
So for the addToQueue method, we will need to @Autowired the errorDetailQueue then we will use it to create the ExcerptAppender in which
ExcerptAppender is the main data container in a Chronicle Queue. In other words, each Chronicle Queue is composed of excerpts. Writing message to a Chronicle Queue means starting a new excerpt, writing message into it, and finishing the excerpt at the end.
So the method appender.startExcerpt(); is used to start an excerpt with the default message capacity of 128K (can be configured) .
Then for the method appender.writeUTFΔ(errorDetailJson);, it receive a String message and it will encoding this message with modified UTF-8 and push it into the queue and the length of this message is no limit.
Next, for the readAllFromQueue method we will do it as below.
For the readAllFromQueue method, we will use theerrorDetailQueue to create the ExcerptTailer in which.
ExcerptTailer is an excerpt reader optimized for sequential reads. It can perform sequential and random reads, both forwards and backwards. Tailers read the next available message each time they are called. The followings are guaranteed in Chronicle Queue.
So the method errorDetailQueue.createTailer() is used to create an ExcerptTailer .
Then we can use tailer.nextIndex() to check the next available message in the queue.
Then we use tailer.readUTFΔ() to read the message that we put into the queue using appender.writeUTFΔ(<StringData>).
Finally, we use tailer.finish(); to finish reading message in queue.
Next for the method handleNextItemFromQueue we would like to add the codes as below.
So now, as you can see, we will add one more property for the ChronicleQueueService, it is the currentIndex. So to Initialize the current index we use the getCurrentIndex method to retrieve the last written index of the errorDetailQueueIndex Chronicle Queue and assigns it to the currentIndex field. This method is called during bean initialization using the @PostConstruct annotation.
The handleNextItemFromQueue method creates an ExcerptTailer for the errorDetailQueue Chronicle Queue and seeks to the next item based on the currentIndex. If an item is found, it will be deserialized to an ErrorDetail instance and the new index is written to the errorDetailQueueIndex Chronicle Queue using an ExcerptAppender.
Next for the method sendNotifiedEmail we would like to add the codes as below.
packagecom.springboot.project.chronicle.queue.app.service;importcom.fasterxml.jackson.databind.ObjectMapper;importcom.springboot.project.chronicle.queue.app.model.ErrorDetail;importlombok.extern.slf4j.Slf4j;importnet.openhft.chronicle.Chronicle;importnet.openhft.chronicle.ExcerptAppender;importnet.openhft.chronicle.ExcerptTailer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Service;importjavax.annotation.PostConstruct;importjava.io.IOException;importjava.time.OffsetDateTime;importjava.util.ArrayList;importjava.util.List;importjava.util.Objects;importjava.util.UUID;@Slf4j@ServicepublicclassChronicleQueueService{privatelongcurrentIndex;@AutowiredprivateObjectMapperobjectMapper;@Autowired@Qualifier("errorDetailQueue")privateChronicleerrorDetailQueue;@Autowired@Qualifier("errorDetailQueueIndex")privateChronicleerrorDetailQueueIndex;@PostConstructpublicvoidgetCurrentIndex(){this.currentIndex=this.errorDetailQueueIndex.lastWrittenIndex();}publicvoidaddToQueue(){try{ExcerptAppenderappender=this.errorDetailQueue.createAppender();appender.startExcerpt();ErrorDetailerrorDetail=this.createErrorDetail();StringerrorDetailJson=this.objectMapper.writeValueAsString(errorDetail);appender.writeUTFΔ(errorDetailJson);appender.finish();}catch(IOExceptionex){thrownewRuntimeException(ex);}}publicList<ErrorDetail>readAllFromQueue(){try{List<ErrorDetail>errorDetails=newArrayList<>();ExcerptTailertailer=this.errorDetailQueue.createTailer();while(tailer.nextIndex()){ErrorDetailerrorDetail=this.objectMapper.readValue(tailer.readUTFΔ(),ErrorDetail.class);errorDetails.add(errorDetail);}tailer.finish();returnerrorDetails;}catch(IOExceptione){thrownewRuntimeException(e);}}publicErrorDetailhandleNextItemFromQueue(){try{ExcerptTailertailer=this.errorDetailQueue.createTailer();if(tailer.index(1+this.currentIndex)){ErrorDetailerrorDetail=this.objectMapper.readValue(tailer.readUTFΔ(),ErrorDetail.class);ExcerptAppenderindexAppender=this.errorDetailQueueIndex.createAppender();indexAppender.startExcerpt();indexAppender.writeUTF(String.valueOf(tailer.index()));this.currentIndex=tailer.index();indexAppender.finish();tailer.finish();returnerrorDetail;}returnnewErrorDetail();}catch(IOExceptione){thrownewRuntimeException(e);}}@Scheduled(fixedDelay=1000)publicvoidsendNotifiedEmail(){try{Thread.sleep(6000);ErrorDetailerrorDetail=this.handleNextItemFromQueue();if(Objects.isNull(errorDetail.getId())){log.info("No Error Item In Queue: Queue is empty!");return;}log.info("Sent Notified Email id: "+errorDetail.getId()+" With Timestamp: "+errorDetail.getTimestamp());}catch(InterruptedExceptione){e.printStackTrace();}}privateErrorDetailcreateErrorDetail(){ErrorDetailerrorDetail=newErrorDetail();errorDetail.setId(UUID.randomUUID());errorDetail.setErrorCode(403);errorDetail.setErrorMessage("Error Message");errorDetail.setTimestamp(OffsetDateTime.now());returnerrorDetail;}}
So in the sendNotifiedEmail method, which is annotated with @Scheduled to retrieve the next ErrorDetail object from the queue that we handled in the handleNextItemFromQueue() every second and logs a message indicating that an email has been sent.
The api POST /v1/chronicle-queue/error-details will call the addToQueue method from the ChronicleQueueService class to add an error detail to the Chronicle Queue.
The api GET /v1/chronicle-queue/error-details will call the readAllFromQueue method from the ChronicleQueueService class to get a list of all error details stored in the Chronicle Queue.
The api GET /v1/chronicle-queue/error-details/polls will call the handleNextItemFromQueue method from the ChronicleQueueService class to get and handle the next error detail from the Chronicle Queue.
Now, let's start our application and use postman to call the api POST /v1/chronicle-queue/error-details 6 times to add 6 messages into the Chronicle Queue as in the image below.
Then let's wait the method sendNotifiedEmail() handle about 3 messages in the Queue then we will stop our application.
Now, let's start our application again, then you can see our application will continue to handle the next messages in the queue.
In this section we learned how to implement a Chronicle Queue in a Spring Boot application and we also resolved issues that we put in the beginning of the example. We built an Spring Application Service which
Avoided overloading and request timeout when there are many requests at the same time.
Avoided lost messages when the service is stopped or restarted.
had the ability to continue handing on the last handled message when the service is stopped or restarted.
Using Chronicle Queue also has some disadvantages that are when we deploy our Spring Boot application in multi instances environment which can make we have separated queues in every instance and we when we deploy our Spring Boot application but we don't provide a persistence volume for it then the all the data will be lost after every application deployment.