your programing

예제 별 RabbitMQ : 다중 스레드, 채널 및 대기열

lovepro 2021. 1. 5. 19:48
반응형

예제 별 RabbitMQ : 다중 스레드, 채널 및 대기열


방금 RabbitMQ의 Java API 문서를 읽었 으며 매우 유익하고 간단합니다. Channel게시 / 소비를위한 간단한 설정 방법에 대한 예제는 이해하기 매우 쉽습니다. 그러나 이것은 매우 간단하고 기본적인 예이며 중요한 질문을 남겼 습니다. 1+ Channels를 여러 대기열에 게시 / 소비하도록 설정하려면 어떻게해야 합니까?

하자 내가 그것을 3 큐와 RabbitMQ 서버가 말 : logging, security_eventscustomer_orders. 따라서 Channel3 개의 대기열 모두에 게시 / 소비 할 수 있는 단일 항목 필요 하거나 Channels, 각각 단일 대기열 전용 인 3 개의 개별 .

또한 RabbitMQ의 모범 사례에 Channel따르면 소비자 스레드 당 1 개를 설정해야 합니다. 이 예를 들어,하자의 말은 security_events단 한 소비자 스레드 괜찮지 만, logging그리고 customer_order둘 필요를 볼륨을 처리하기 위해 5 개 스레드. 따라서 내가 올바르게 이해한다면 다음이 필요하다는 의미입니까?

  • Channel게시 / 소비를위한 1 및 1 소비자 스레드 security_events;
  • Channels게시 / 소비를위한 5 및 5 개의 소비자 스레드 logging;
  • Channels게시 / 소비를위한 5 및 5 개의 소비자 스레드 customer_orders?

내 이해가 여기에서 잘못 이해되면 나를 수정하여 시작하십시오. 어느 쪽이든, 전투에 지친 RabbitMQ 베테랑이 여기 내 요구 사항을 충족하는 게시자 / 소비자를 설정하기위한 적절한 코드 예제로 "점을 연결"하는 데 도움을 줄 수 있습니까? 미리 감사드립니다!


초기 이해에 몇 가지 문제가 있다고 생각합니다. 솔직히 다음을보고 약간 놀랐습니다 both need 5 threads to handle the volume.. 정확한 번호가 필요한지 어떻게 확인 했습니까? 5 개의 스레드가 충분하다는 보장이 있습니까?

RabbitMQ는 조정되고 시간 테스트를 거쳤으므로 적절한 설계와 효율적인 메시지 처리에 관한 것입니다.

문제를 검토하고 적절한 해결책을 찾아 보겠습니다. BTW, 메시지 대기열 자체는 정말 좋은 솔루션이 있다는 보장을 제공하지 않습니다. 수행중인 작업을 이해하고 추가 테스트를 수행해야합니다.

확실히 아시다시피 가능한 많은 레이아웃이 있습니다.

여기에 이미지 설명 입력

생산자 소비자 문제 B를 설명하는 가장 간단한 방법으로 레이아웃 사용하겠습니다 . 처리량에 대해 너무 걱정하고 있기 때문입니다. BTW, RabbitMQ는 꽤 잘 작동합니다 ( source ). 에주의 하십시오. 나중에 설명하겠습니다.1NprefetchCount

여기에 이미지 설명 입력

따라서 메시지 처리 로직은 충분한 처리량을 확보 할 수있는 올바른 장소 일 것입니다. 당연히 메시지를 처리해야 할 때마다 새 스레드를 확장 할 수 있지만 결국 이러한 접근 방식은 시스템을 죽일 것입니다. 기본적으로 스레드가 많을수록 지연 시간이 더 커집니다 ( 원하는 경우 Amdahl의 법칙을 확인할 수 있습니다 ).

여기에 이미지 설명 입력

( 그림의 Amdahl의 법칙 참조 )

팁 # 1 : 스레드에주의하고 ThreadPools를 사용 하십시오 ( 세부 사항 ).

스레드 풀은 실행 가능한 개체 (작업 대기열)의 모음과 실행중인 스레드의 연결로 설명 할 수 있습니다. 이러한 스레드는 지속적으로 실행되고 작업 쿼리에서 새 작업을 확인합니다. 수행 할 새 작업이 있으면이 Runnable을 실행합니다. Thread 클래스 자체는 작업 대기열에 새 Runnable 객체를 추가하는 방법, 예를 들어 execute (Runnable r)을 제공합니다.

public class Main {
  private static final int NTHREDS = 10;

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    for (int i = 0; i < 500; i++) {
      Runnable worker = new MyRunnable(10000000L + i);
      executor.execute(worker);
    }
    // This will make the executor accept no new threads
    // and finish all existing threads in the queue
    executor.shutdown();
    // Wait until all threads are finish
    executor.awaitTermination();
    System.out.println("Finished all threads");
  }
} 

팁 # 2 : 메시지 처리 오버 헤드에주의하세요

나는 이것이 명백한 최적화 기술이라고 말할 것입니다. 작고 처리하기 쉬운 메시지를 보낼 가능성이 높습니다. 전체 접근 방식은 지속적으로 설정되고 처리되는 작은 메시지에 관한 것입니다. 큰 메시지는 결국 나쁜 농담이 될 것이므로 그것을 피하는 것이 좋습니다.

여기에 이미지 설명 입력

따라서 작은 정보를 보내는 것이 낫습니다.하지만 처리는 어떻습니까? 작업을 제출할 때마다 오버 헤드가 발생합니다. 수신 메시지 비율이 높은 경우 일괄 처리가 매우 유용 할 수 있습니다.

여기에 이미지 설명 입력

예를 들어, 간단한 메시지 처리 로직이 있고 메시지가 처리 될 때마다 스레드 특정 오버 헤드를 원하지 않는다고 가정 해 보겠습니다. 매우 간단하게 최적화하려면 CompositeRunnable can be introduced:

class CompositeRunnable implements Runnable {

    protected Queue<Runnable> queue = new LinkedList<>();

    public void add(Runnable a) {
        queue.add(a);
    }

    @Override
    public void run() {
        for(Runnable r: queue) {
            r.run();
        }
    }
}

또는 처리 할 메시지를 수집하여 약간 다른 방식으로 동일한 작업을 수행합니다.

class CompositeMessageWorker<T> implements Runnable {

    protected Queue<T> queue = new LinkedList<>();

    public void add(T message) {
        queue.add(message);
    }

    @Override
    public void run() {
        for(T message: queue) {
            // process a message
        }
    }
}

이러한 방식으로 메시지를보다 효과적으로 처리 할 수 ​​있습니다.

팁 # 3 : 메시지 처리 최적화

메시지를 병렬로 Tip #1처리하고 ( ) 처리 오버 헤드를 줄일 수 있다는 사실을 알고 있음에도 ( Tip #2) 모든 것을 빠르게 처리 해야합니다. 중복 처리 단계, 무거운 루프 등은 성능에 많은 영향을 미칠 수 있습니다. 흥미로운 사례 연구를 참조하십시오.

여기에 이미지 설명 입력

올바른 XML 구문 분석기를 선택하여 메시지 큐 처리량 10 배 향상

팁 # 4 : 연결 및 채널 관리

  • 기존 연결에서 새 채널을 시작하려면 한 번의 네트워크 왕복이 필요합니다. 새 연결을 시작하려면 여러 번 걸립니다.
  • 각 연결은 서버의 파일 설명자를 사용합니다. 채널은 그렇지 않습니다.
  • 한 채널에 큰 메시지를 게시하면 나가는 동안 연결이 차단됩니다. 그 외에는 멀티플렉싱이 상당히 투명합니다.
  • 게시중인 연결은 서버가 과부하되면 차단 될 수 있습니다. 게시 및 사용 연결을 분리하는 것이 좋습니다.
  • 메시지 버스트 처리 준비

( 출처 )

모든 팁은 완벽하게 함께 작동합니다. 추가 세부 정보가 필요하면 언제든지 알려주세요.

완전한 소비자 예 ( 소스 )

다음 사항에 유의하십시오.

  • channel.basicQos (prefetch) -앞에서 본 것처럼 prefetchCount매우 유용 할 수 있습니다.

    This command allows a consumer to choose a prefetch window that specifies the amount of unacknowledged messages it is prepared to receive. By setting the prefetch count to a non-zero value, the broker will not deliver any messages to the consumer that would breach that limit. To move the window forwards, the consumer has to acknowledge the receipt of a message (or a group of messages).

  • ExecutorService threadExecutor - you can specify properly configured executor service.

Example:

static class Worker extends DefaultConsumer {

    String name;
    Channel channel;
    String queue;
    int processed;
    ExecutorService executorService;

    public Worker(int prefetch, ExecutorService threadExecutor,
                  , Channel c, String q) throws Exception {
        super(c);
        channel = c;
        queue = q;
        channel.basicQos(prefetch);
        channel.basicConsume(queue, false, this);
        executorService = threadExecutor;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        Runnable task = new VariableLengthTask(this,
                                               envelope.getDeliveryTag(),
                                               channel);
        executorService.submit(task);
    }
}

You can also check the following:


How can I set up 1+ Channels to publish/consume to and from multiple queues?

You can implement using threads and channels. All you need is a way to categorize things, ie all the queue items from the login, all the queue elements from security_events etc. The catagorization can be achived using a routingKey.

ie: Every time when you add an item to the queue u specify the routing key. It will be appended as a property element. By this you can get the values from a particular event say logging.

The following Code sample explain how you make it done in client side.

Eg:

The routing key is used identify the type of the channel and retrive the types.

For example if you need to get all the channels about the type Login then you must specify the routing key as login or some other keyword to identify that.

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            string routingKey="login";

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

You can Look here for more details about the Categorization ..


스레드 부분

게시 부분이 끝나면 스레드 부분을 실행할 수 있습니다.

이 부분에서는 카테고리를 기준으로 게시 된 데이터를 얻을 수 있습니다. 즉; 귀하의 경우에 로깅, security_events 및 customer_orders 등의 라우팅 키.

스레드에서 데이터를 검색하는 방법을 알아 보려면 예제를보십시오.

예 :

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
//**The threads part is as follows** 
 channel.exchangeDeclare(EXCHANGE_NAME, "direct");      
 String queueName = channel.queueDeclare().getQueue();
    // This part will biend the queue with the severity (login for eg:)
    for(String severity : argv){
              channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    }
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();

             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
     }
 });

이제 login (routing key) 유형의 Queue에있는 Data를 처리하는 스레드가 생성됩니다. 이 방법으로 여러 스레드를 만들 수 있습니다. 각각 다른 목적을 제공합니다.

보면 여기 스레드 부분에 대한 자세한 내용은 ..

참조 URL : https://stackoverflow.com/questions/18531072/rabbitmq-by-example-multiple-threads-channels-and-queues

반응형