Библиотека состоит из трех функциональных модулей:
-
Клиентский модуль
ModuleClientПринимает и оправляет сообщения другим клиентам -
Модуль статусов и статистики
StatusClientЗаписывает сообщения об ошибках или успешных приемах/отправках сообщений Клиентских модулей -
Подключение к хранилищу
StorageConnectionЗагрузка и выгрузка файлов Min.io
Принимает и оправляет сообщения другим клиентам
repositories {
maven { url "https://nexus.smk-systems.ru/repository/common-utils/" }
}
dependencies {
implementation 'com.rabbitmq:amqp-client:5.20.0'
implementation 'ru.smk-systems.svev:mqlib:1.1.4'
}Создаём client (экземпляр класса ModuleClient).
import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
ModuleClient сlient = new ModuleClient(
new ModuleName("web"), "5machines",
"amqp://guest:[email protected]:5672"
);web - имя текущего модуля (экземпляр класса ModuleName);
5machines - префикс (контур, к которому планируется подключение)
amqp://guest:[email protected]:5672 - ссылка, содержащая учетные данные, домен и порт сервера RabbitMQ;
Строка mqlib-queue-, Префикс и имя текущего модуля составляют собой имя очереди, из которой client (с
помощью методов client.receive() и client.receiveAsync()) забирает сообщения.
Например: mqlib-queue-5machines-broker-01
Очереди декларируются автоматически в момент в выполнения методов client.receive() или client.receiveAsync(). Если
послать сообщение клиентскому модулю короый не разу не выполнял методы client.receive()
или client.receiveAsync(), то сообщение уйдет в не существующую очередь и потеряется. Для того чтобы избежать
таких ситуации предусмотрен статический метод Condition.declare()
В примере декларируются очереди для нескольких Клиентских модулей сразу
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.common.Condition;
public static void main(String[] args) {
Condition.declare(
client.getChannel(),
new ModuleName(Arrays.asList( "xml", "broker", "web" )),
"5machines"
);
}import ru.smk_systems.svev.mqlib.models.MessageGetResponse;
import ru.smk_systems.svev.mqlib.models.Message;
while (true) {
MessageGetResponse response = client.receive();
if (response == null) break;
Message message = response.getMessage();
System.out.println(message.getBody() + "\n\nmessageCount: " + response.getMessageCount());
if (response.getMessageCount() == 0) break;
}client.receive() - возвращает экземпляр класса MessageGetResponse.
response.getMessage() - возвращает сообщение по-одному (класс Message)
response.getMessageCount() - сколько еще сообщений осталось. при response.getMessageCount() == 0 выходим из цикла.
import ru.smk_systems.svev.mqlib.models.Message;
client.receiveAsync((deliveryTag, incomingMessage) -> {
System.out.println(incomingMessage.getBody());
});client.receiveAsync() cоздаёт отдельный поток, который завершается вместе с завышением основного
потока приложения
incomingMessage экземпляр класса Message;
Записывает сообщения об ошибках или успешных приемах/отправках сообщений Клиентских модулей.
Загрузка и выгрузка файлов Min.io
repositories {
maven { url "https://nexus.smk-systems.ru/repository/common-utils/" }
}
dependencies {
implementation 'com.rabbitmq:amqp-client:5.20.0'
implementation 'org.postgresql:postgresql:42.7.3' // Only for Status client
implementation 'io.minio:minio:8.5.11' // Storage client
implementation 'io.minio:minio-admin:8.5.11' // Storage client
implementation 'ru.smk-systems.svev:mqlib:1.1.4'
}- MinIO WebUI
minioadmin:minioadmin
dependencies {
implementation 'io.minio:minio:8.5.11'
implementation 'io.minio:minio-admin:8.5.11'
}import ru.smk_systems.svev.mqlib.common.StorageConnection;
public static void main(String[] args) {
// StorageConnection storage = new StorageConnection("http://minio:[email protected]:9000"); // Bucket Name: mqlib
StorageConnection storage = new StorageConnection("http://minio:[email protected]:9000", "mqlib");
// {@link StorageConnection#path} Autogenerated by constructor
// Set current path
storage.setPath(message.getId());
// or
storage.setPath(UUID.fromString("63ffd87a-daf0-4571-b288-403063e581b5"));
// Upload
storage.upload("README_01.txt", "Text content in UTF-8");
storage.upload("README_02.txt", new File("../README.md"));
storage.upload("README_03.txt", new InputStream());
storage.upload("README_04.txt", new Byte[]{});
// Download
List<String> filenames = storage.list();
for(String name : filenames) {
URL link = storage.getLink(name); // like that http://minio:[email protected]:9000/mqlib/63ffd...
InputStream stream = storage.get(name);
String content = storage.getAsText(name, Charsets.UTF_8.name());
}
// Delete
storage.delete("README_03.txt");
storage.deletePath();
// For advanced usage
MinioClient minioClient = storage.getClient();
// Close manually or use try-with-resources if needed
storage.close();
}Declare queues, exchange and bind routing keys
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.common.Condition;
public static void main(String[] args) {
Condition.declare(
webClient.getChannel(),
new ModuleName(Arrays.asList( "xml", "broker", "web" )),
"test07"
);
}import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
import ru.smk_systems.svev.mqlib.models.MessageFile;
import static ru.smk_systems.svev.mqlib.models.constants.Message.*;
public static void main(String[] args) {
ModuleClient webClient = new ModuleClient(
new ModuleName("web"), "test07",
"amqp://guest:[email protected]:5672"
);
webClient.receiveAsync((deliveryTag, incomingMessage) -> {
System.out.println(webClient.getModuleName() +
" <-- " + incomingMessage.getFrom().getFirst()
);
});
Message newMessage = new Message(webClient.getModuleName());
newMessage.setAction(
BASE
// STATISTICS
// GET_CLIENT_SYSTEM
);
newMessage.setId(UUID.randomUUID());
newMessage.setTo(new ModuleName(Arrays.asList(
"xml", "broker", "web"
)));
newMessage.setBody("`fileid1` and `fileid2` example body");
newMessage.setFiles(Arrays.asList(
new MessageFile("fileid1", "filename1.txt", "text/plain"),
new MessageFile("fileid2", "filename2.txt", "text/plain",
"ftp", "ftp://user:password@localhost",
"/upload",
"filename2.txt")
));
webClient.sendCustomStatus(newMessage, "CREATED", "Message with body: " + newMessage.getBody());
webClient.send(newMessage);
// webClient.closeConnection();
}Асинхронная обработка
import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
public static void main(String[] args) {
ModuleClient webClient = new ModuleClient(
new ModuleName("web"), "test07",
"amqp://guest:[email protected]:5672"
);
webClient.setSilent(true); // Without Status messages
webClient.setAutoAck(false);
webClient.receiveAsync((deliveryTag, incomingMessage) -> {
System.out.println(webClient.getModuleName() +
" <-- " + incomingMessage.getFrom().getFirst()
);
if (!webClient.isAutoAck()){
// webClient.sendAck(incomingMessage, deliveryTag);
// webClient.sendNack(incomingMessage, deliveryTag);
// webClient.sendNack(deliveryTag);
webClient.sendReject(deliveryTag);
}
});
}Синхронная обработка
import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
public static void main(String[] args) {
ModuleClient webClient = new ModuleClient(
new ModuleName("web"), "test07",
"amqp://guest:[email protected]:5672"
);
webClient.setSilent(true); // Without Status messages
webClient.setAutoAck(false);
while (true) {
MessageGetResponse response = webClient.receive();
if (response == null) break;
Message webIncomingMessage = response.getMessage();
System.out.println(webClient.getModuleName() +
" <-- " + response.getMessage().getFrom().getFirst() +
"\tcreatedBy: " + response.getMessage().getAuthor() +
"\tmessageCount: " + response.getMessageCount()
);
if (!webClient.isAutoAck()) {
webClient.sendAck(
response.getMessage(),
response.getEnvelope().getDeliveryTag()
);
}
webClient.send(response.getMessage());
if (response.getMessageCount() == 0) break;
}
}Если webClient.setSilent(false); webClient.setAutoAck(false); статусы:
RECEIVEDПрилетают в моментsendAck;ERRORв моментsendNackиsendReject;
Статусы, которые учитываюстя в таблице mqlib_status_statistics:
CREATEDRECEIVEDSENDEDERRORWARNING
dependencies {
implementation 'org.postgresql:postgresql:42.7.3'
}import ru.smk_systems.svev.mqlib.StatusClient;
import ru.smk_systems.svev.mqlib.models.Status;
import ru.smk_systems.svev.mqlib.utils.ColorDump;
public static void main(String[] args) {
StatusClient statusClient = new StatusClient(
"test07",
"amqp://guest:[email protected]:5672",
"jdbc:postgresql://hp-trofimov-ubuntu.smk-systems.ru:5432/mqsql?user=mqsql&password=mqsql"
);
statusClient.receiveAndWriteToDbAsync((deliveryTag, incomingStatus) -> {
ColorDump.println(
incomingStatus.toString(),
ColorDump.ANSI_YELLOW
);
});
}import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
public static void main(String[] args) {
ModuleClient xmlClient = new ModuleClient(
new ModuleName("xml"), "test07",
"amqp://guest:[email protected]:5672"
);
xmlClient.setPrefetchCount(0); // unlimited
xmlClient.receiveAsync((deliveryTag, incomingMessage) -> {
System.out.println(xmlClient.getModuleName() +
" <-- " + incomingMessage.getFrom().getFirst()
);
xmlClient.send(incomingMessage);
});
}import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
import ru.smk_systems.svev.mqlib.models.MessageFile;
public static void main(String[] args) {
ModuleClient brokerClient = new ModuleClient(
new ModuleName("broker"), "test07",
"amqp://guest:[email protected]:5672"
);
brokerClient.receiveAsync((deliveryTag, incomingMessage) -> {
System.out.println(brokerClient.getModuleName() +
" <-- " + incomingMessage.getFrom().getFirst() +
"\tcreatedBy: " + incomingMessage.getAuthor()
);
List<MessageFile> brokerFiles = incomingMessage.getFiles();
System.out.println("messageFile: " + brokerFiles.get(0).getName());
System.out.println("messageFile: " + brokerFiles.get(1).getStorageFilename());
brokerClient.sendCustomStatus(incomingMessage, "BROKER_TEST", "Broker test status message");
brokerClient.send(incomingMessage);
});
// brokerClient.closeConnection();
}import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
import ru.smk_systems.svev.mqlib.models.MessageFile;
import ru.smk_systems.svev.mqlib.models.MessageGetResponse;
public static void main(String[] args) {
ModuleClient brokerClient = new ModuleClient(
new ModuleName("broker"), "test07",
"amqp://guest:[email protected]:5672"
);
while (true) {
MessageGetResponse response = brokerClient.receive();
if (response == null) break;
Message brokerIncomingMessage = response.getMessage();
System.out.println(brokerClient.getModuleName() +
" <-- " + brokerIncomingMessage.getFrom().getFirst() +
"\tcreatedBy: " + brokerIncomingMessage.getAuthor() +
"\tmessageCount: " + response.getMessageCount()
);
brokerClient.send(brokerIncomingMessage);
if (response.getMessageCount() == 0) break;
}
}