Skip to content

moalex/mqlib

Repository files navigation

Библиотека состоит из трех функциональных модулей:

  1. Клиентский модуль ModuleClient Принимает и оправляет сообщения другим клиентам

  2. Модуль статусов и статистики StatusClient Записывает сообщения об ошибках или успешных приемах/отправках сообщений Клиентских модулей

  3. Подключение к хранилищу StorageConnection Загрузка и выгрузка файлов Min.io

Клиентский модуль ModuleClient

Принимает и оправляет сообщения другим клиентам

Инициализация Клиентского модуля

Gradle

repositories {
    maven { url "https://nexus.smk-systems.ru/repository/common-utils/" }
}

dependencies {
    implementation 'com.rabbitmq:amqp-client:5.20.0'
    implementation 'ru.smk-systems.svev:mqlib:1.1.4'
}

Java

Создаём client (экземпляр класса ModuleClient).

import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;

ModuleClient сlient = new ModuleClient(
    new ModuleName("web"), "5machines",
    "amqp://guest:[email protected]:5672"
);

web - имя текущего модуля (экземпляр класса ModuleName);

5machines - префикс (контур, к которому планируется подключение)

amqp://guest:[email protected]:5672 - ссылка, содержащая учетные данные, домен и порт сервера RabbitMQ;


Строка mqlib-queue-, Префикс и имя текущего модуля составляют собой имя очереди, из которой client (с помощью методов client.receive() и client.receiveAsync()) забирает сообщения. Например: mqlib-queue-5machines-broker-01

Очереди декларируются автоматически в момент в выполнения методов client.receive() или client.receiveAsync(). Если послать сообщение клиентскому модулю короый не разу не выполнял методы client.receive() или client.receiveAsync(), то сообщение уйдет в не существующую очередь и потеряется. Для того чтобы избежать таких ситуации предусмотрен статический метод Condition.declare()

В примере декларируются очереди для нескольких Клиентских модулей сразу

import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.common.Condition;

    public static void main(String[] args) {
        Condition.declare(
                client.getChannel(),
                new ModuleName(Arrays.asList( "xml", "broker", "web" )),
                "5machines"
        );
    }

Приём сообщений

Синхронный (последовательный) прием client.receive()

import ru.smk_systems.svev.mqlib.models.MessageGetResponse;
import ru.smk_systems.svev.mqlib.models.Message;

while (true) {
    MessageGetResponse response = client.receive();
    if (response == null) break;

    Message message = response.getMessage();
    System.out.println(message.getBody() + "\n\nmessageCount: " + response.getMessageCount());
    
    if (response.getMessageCount() == 0) break;
}

client.receive() - возвращает экземпляр класса MessageGetResponse.

response.getMessage() - возвращает сообщение по-одному (класс Message)

response.getMessageCount() - сколько еще сообщений осталось. при response.getMessageCount() == 0 выходим из цикла.

Асинхронный (последовательный) прием client.receiveAsync()

import ru.smk_systems.svev.mqlib.models.Message;

client.receiveAsync((deliveryTag, incomingMessage) -> {
    System.out.println(incomingMessage.getBody());
});

client.receiveAsync() cоздаёт отдельный поток, который завершается вместе с завышением основного потока приложения

incomingMessage экземпляр класса Message;

Отправка сообщений

Модуль статусов и статистики StatusClient

Записывает сообщения об ошибках или успешных приемах/отправках сообщений Клиентских модулей.

Подключение к хранилищу StorageConnection

Загрузка и выгрузка файлов Min.io






Links

build.gradle

repositories {
    maven { url "https://nexus.smk-systems.ru/repository/common-utils/" }
}

dependencies {
    implementation 'com.rabbitmq:amqp-client:5.20.0'
    implementation 'org.postgresql:postgresql:42.7.3' // Only for Status client
    implementation 'io.minio:minio:8.5.11'        // Storage client
    implementation 'io.minio:minio-admin:8.5.11'  // Storage client
    
    implementation 'ru.smk-systems.svev:mqlib:1.1.4'
}

Client Examples

Storage

Links

build.gradle

dependencies {
    implementation 'io.minio:minio:8.5.11'
    implementation 'io.minio:minio-admin:8.5.11'
}
import ru.smk_systems.svev.mqlib.common.StorageConnection;

    public static void main(String[] args) {
        // StorageConnection storage = new StorageConnection("http://minio:[email protected]:9000"); // Bucket Name: mqlib
        StorageConnection storage = new StorageConnection("http://minio:[email protected]:9000", "mqlib");
    
        // {@link StorageConnection#path} Autogenerated by constructor
        // Set current path
        storage.setPath(message.getId());
        //   or
        storage.setPath(UUID.fromString("63ffd87a-daf0-4571-b288-403063e581b5"));
    
        // Upload
        storage.upload("README_01.txt", "Text content in UTF-8");
        storage.upload("README_02.txt", new File("../README.md"));
        storage.upload("README_03.txt", new InputStream());
        storage.upload("README_04.txt", new Byte[]{});
        
        // Download
        List<String> filenames = storage.list();
        
        for(String name : filenames) {
            URL link = storage.getLink(name); // like that http://minio:[email protected]:9000/mqlib/63ffd...
            InputStream stream = storage.get(name);
            String content = storage.getAsText(name, Charsets.UTF_8.name());
        }
        
        // Delete
        storage.delete("README_03.txt");
        storage.deletePath();
        
        // For advanced usage
        MinioClient minioClient = storage.getClient();
        
        // Close manually or use try-with-resources if needed
        storage.close();
    }

Batch Declaring

Declare queues, exchange and bind routing keys

import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.common.Condition;

    public static void main(String[] args) {
        Condition.declare(
                webClient.getChannel(),
                new ModuleName(Arrays.asList( "xml", "broker", "web" )),
                "test07"
        );
    }

WebClient

import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
import ru.smk_systems.svev.mqlib.models.MessageFile;
import static ru.smk_systems.svev.mqlib.models.constants.Message.*;

    public static void main(String[] args) {
        ModuleClient webClient = new ModuleClient(
            new ModuleName("web"), "test07",
            "amqp://guest:[email protected]:5672"
        );

        webClient.receiveAsync((deliveryTag, incomingMessage) -> {
            System.out.println(webClient.getModuleName() +
                " <-- " + incomingMessage.getFrom().getFirst()
            );
        });

        Message newMessage = new Message(webClient.getModuleName());
        newMessage.setAction(
                BASE
//                STATISTICS
//                GET_CLIENT_SYSTEM
        );
        newMessage.setId(UUID.randomUUID());
        newMessage.setTo(new ModuleName(Arrays.asList(
                "xml", "broker", "web"
        )));

        newMessage.setBody("`fileid1` and `fileid2` example body");
        newMessage.setFiles(Arrays.asList(
            new MessageFile("fileid1", "filename1.txt", "text/plain"),

            new MessageFile("fileid2", "filename2.txt", "text/plain",
                                "ftp", "ftp://user:password@localhost",
                                "/upload",
                                "filename2.txt")
        ));

        webClient.sendCustomStatus(newMessage, "CREATED", "Message with body: " + newMessage.getBody());

        webClient.send(newMessage);
//        webClient.closeConnection();
    }

Without AutoAck

Асинхронная обработка

import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;

    public static void main(String[] args) {
        ModuleClient webClient = new ModuleClient(
            new ModuleName("web"), "test07",
            "amqp://guest:[email protected]:5672"
        );
        
        webClient.setSilent(true); // Without Status messages
        webClient.setAutoAck(false);
        
        webClient.receiveAsync((deliveryTag, incomingMessage) -> {
            System.out.println(webClient.getModuleName() +
                " <-- " + incomingMessage.getFrom().getFirst()
            );
            
            if (!webClient.isAutoAck()){
//                webClient.sendAck(incomingMessage, deliveryTag);
//                webClient.sendNack(incomingMessage, deliveryTag);
//                webClient.sendNack(deliveryTag);
                webClient.sendReject(deliveryTag);
            }
        });
    }

Синхронная обработка

import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;

    public static void main(String[] args) {
        ModuleClient webClient = new ModuleClient(
            new ModuleName("web"), "test07",
            "amqp://guest:[email protected]:5672"
        );

        webClient.setSilent(true); // Without Status messages
        webClient.setAutoAck(false);

        while (true) {
            MessageGetResponse response = webClient.receive();
            if (response == null) break;

            Message webIncomingMessage = response.getMessage();
            System.out.println(webClient.getModuleName() +
                    " <-- " + response.getMessage().getFrom().getFirst() +
                    "\tcreatedBy: " + response.getMessage().getAuthor() +
                    "\tmessageCount: " + response.getMessageCount()
            );
            if (!webClient.isAutoAck()) {
                webClient.sendAck(
                        response.getMessage(),
                        response.getEnvelope().getDeliveryTag()
                );
            }
            webClient.send(response.getMessage());

            if (response.getMessageCount() == 0) break;
        }
    }

Если webClient.setSilent(false); webClient.setAutoAck(false); статусы:

  • RECEIVED Прилетают в момент sendAck;
  • ERROR в момент sendNack и sendReject;

StatusListener

Статусы, которые учитываюстя в таблице mqlib_status_statistics:

  • CREATED
  • RECEIVED
  • SENDED
  • ERROR
  • WARNING

build.gradle

dependencies {
    implementation 'org.postgresql:postgresql:42.7.3'
}

Reset table

database-clean.sql

import ru.smk_systems.svev.mqlib.StatusClient;
import ru.smk_systems.svev.mqlib.models.Status;
import ru.smk_systems.svev.mqlib.utils.ColorDump;

    public static void main(String[] args) {
        StatusClient statusClient = new StatusClient(
                "test07",
                "amqp://guest:[email protected]:5672",
                "jdbc:postgresql://hp-trofimov-ubuntu.smk-systems.ru:5432/mqsql?user=mqsql&password=mqsql"
        );

        statusClient.receiveAndWriteToDbAsync((deliveryTag, incomingStatus) -> {
            ColorDump.println(
                incomingStatus.toString(),
                ColorDump.ANSI_YELLOW
            );
        });
    }

XmlClient

import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;

    public static void main(String[] args) {
        ModuleClient xmlClient = new ModuleClient(
            new ModuleName("xml"), "test07",
            "amqp://guest:[email protected]:5672"
        );
        
        xmlClient.setPrefetchCount(0); // unlimited

        xmlClient.receiveAsync((deliveryTag, incomingMessage) -> {
            System.out.println(xmlClient.getModuleName() +
                " <-- " + incomingMessage.getFrom().getFirst()
            );

            xmlClient.send(incomingMessage);
        });
    }

BrokerClient

import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
import ru.smk_systems.svev.mqlib.models.MessageFile;

    public static void main(String[] args) {
        ModuleClient brokerClient = new ModuleClient(
            new ModuleName("broker"), "test07",
            "amqp://guest:[email protected]:5672"
        );

        brokerClient.receiveAsync((deliveryTag, incomingMessage) -> {
            System.out.println(brokerClient.getModuleName() +
                " <-- " + incomingMessage.getFrom().getFirst() +
                "\tcreatedBy: " + incomingMessage.getAuthor()
            );

            List<MessageFile> brokerFiles = incomingMessage.getFiles();
            System.out.println("messageFile: " + brokerFiles.get(0).getName());
            System.out.println("messageFile: " + brokerFiles.get(1).getStorageFilename());

            brokerClient.sendCustomStatus(incomingMessage, "BROKER_TEST", "Broker test status message");

            brokerClient.send(incomingMessage);
        });
        
        // brokerClient.closeConnection();
    }

Sync

import ru.smk_systems.svev.mqlib.ModuleClient;
import ru.smk_systems.svev.mqlib.models.ModuleName;
import ru.smk_systems.svev.mqlib.models.Message;
import ru.smk_systems.svev.mqlib.models.MessageFile;
import ru.smk_systems.svev.mqlib.models.MessageGetResponse;

    public static void main(String[] args) {
        ModuleClient brokerClient = new ModuleClient(
            new ModuleName("broker"), "test07",
            "amqp://guest:[email protected]:5672"
        );
        while (true) {
            MessageGetResponse response = brokerClient.receive();
            if (response == null) break;

            Message brokerIncomingMessage = response.getMessage();
            System.out.println(brokerClient.getModuleName() +
                    " <-- " + brokerIncomingMessage.getFrom().getFirst() +
                    "\tcreatedBy: " + brokerIncomingMessage.getAuthor() + 
                    "\tmessageCount: " + response.getMessageCount()
            );

            brokerClient.send(brokerIncomingMessage);
            
            if (response.getMessageCount() == 0) break;
        }
    }

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages