Одиночний споживач, що виробляє, тепер потребує декількох споживачів

У мене є ситуація, коли я написав просту модель споживача Producer для читання в шматочках даних з Bluetooth, то кожні 10k байт я пишу, що до файлу. Я використав стандартну модель P-C, використовуючи вектор як власний власник повідомлення. Тож як я можу змінити це, щоб користувачі з декількох потоків могли читати ті самі повідомлення, я думаю, що цей термін буде Multicaster? Я насправді використовую це на телефоні Android, тому JMS, ймовірно, не є варіантом.

static final int MAXQUEUE = 50000; 
private Vector messages = new Vector(); 

/**
 * Put the message in the queue for the Consumer Thread
 */
private synchronized void putMessage(byte[] send) throws InterruptedException { 

    while ( messages.size() == MAXQUEUE ) 
        wait(); 
    messages.addElement( send ); 
    notify(); 
} 


/**
 * This method is called by the consumer to see if any messages in the queue
 */
public synchronized byte[] getMessage()throws InterruptedException { 
    notify(); 
    while ( messages.size() == 0 && !Thread.interrupted()) {
        wait(1); 
    }
    byte[] message = messages.firstElement(); 
    messages.removeElement( message ); 
    return message; 
} 

Я посилаюся на код з книги Oreilly Parser повідомлення

1

3 Відповіді

Паб-під-механізм, безумовно, є способом досягнення того, що ви хочете. Я не знаю, чому розробка для Android обмежуватиме вас використанням JMS, що настільки ж простий, як і специфікація. Перевіряти цей потік на SO.

1
додано
Так, але це в основному говорить про використання JMS, який є свого роду шлях за бортом для того, що мені потрібно на телефонній програмі, відправляючи повідомлення між потоками.
додано Автор JPM, джерело
У такому випадку просте обгортання вашої векторної (або черги, як запропоновано Ханно Біндером) у моделі спостерігача/спостереження робить?
додано Автор mazaneicha, джерело

Ви обов'язково повинні використовувати чергу замість Vector !

Дайте кожній ниві свою чергу і, коли з'явиться нове повідомлення, add() - нове повідомлення до черги кожної нитки. Для гнучкості шаблон слухача може бути корисним.

Редагувати:

Добре, я відчуваю, що повинен додати ще й приклад:
(Класична модель спостерігача)

Це інтерфейс, який повинні втілювати всі споживачі:

public interface MessageListener {
  public void newMessage( byte[] message );
}

Виробник може виглядати наступним чином:

public class Producer {
  Collection listeners = new ArrayList();


 //Allow interested parties to register for new messages
  public void addListener( MessageListener listener ) {
    this.listeners.add( listener );
  }

  public void removeListener( Object listener ) {
    this.listeners.remove( listener );
  }

  protected void produceMessages() {
    byte[] msg = new byte[10];

   //Create message and put into msg

   //Tell all registered listeners about the new message:
    for ( MessageListener l : this.listeners ) {
      l.newMessage( msg );
    }

  }
}

І споживчий клас може бути (використовуючи чергу блокування, яка робить все, що wait() ing і notify() ing для нас):

public class Consumer implements MessageListener {

  BlockingQueue< byte[] > queue = new LinkedBlockingQueue< byte[] >();

 //This implements the MessageListener interface:
  @Override
  public void newMessage( byte[] message ) {
    try {
      queue.put( message );
    } catch (InterruptedException e) {
       //won't happen.
    }
  }

   //Execute in another thread:       
    protected void handleMessages() throws InterruptedException {
        while ( true ) {
            byte[] newMessage = queue.take();

           //handle the new message.
        }
    }


}
0
додано
І Вектор, і черга можуть містити лише об'єкти, а не примати. Використовуйте блокування черги, і ваш весь код вище стає так просто, як queue.put (повідомлення) і queue.take ().
додано Автор JimmyB, джерело
Я нахиляюсь більше до вашого прикладу, тому що я не можу отримати SettableFuture для роботи.
додано Автор JPM, джерело
Я завжди обробляв щось на зразок цього, а Служби навіть написали цілу систему слухача графічного інтерфейсу для отримання даних в реальному часі з системи JMS та оновлення користувацьких компонентів за допомогою цих змін у стані.
додано Автор JPM, джерело

Це те, що я придумав як приклад, копаючи через деякий код та модифікацію деяких існуючих прикладів.

package test.messaging;

import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;

public class TestProducerConsumers {

    static Broker broker;

    public TestProducerConsumers(int maxSize) {
        broker = new Broker(maxSize);
        Producer p = new Producer();
        Consumer c1 = new Consumer("One");
        broker.consumers.add(c1);
        c1.start();

        Consumer c2 = new Consumer("Two");
        broker.consumers.add(c2);
        c2.start();

        p.start();
    }

   //Test Producer, use your own message producer on a thread to call up
   //broker.insert() possibly passing it the message instead.
    class Producer extends Thread {

        @Override
        public void run() {
            while (true) {
                try {
                    broker.insert();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread {
        String myName;
        LinkedBlockingQueue queue;

        Consumer(String m) {
            this.myName = m;
            queue = new LinkedBlockingQueue();
        }

        @Override
        public void run() {
            while(!Thread.interrupted()) {
                try {
                    while (queue.size() == 0 && !Thread.interrupted()) {
                        ;
                    }
                    while (queue.peek() == null && !Thread.interrupted()) {
                        ;
                    }
                    System.out.println("" + myName + " Consumer: " + queue.poll());
                } catch (Exception e) { }
            }
        }
    }

    class Broker {
        public ArrayList consumers = new ArrayList();

        int n;
        int maxSize;

        public Broker(int maxSize) {
            n = 0;
            this.maxSize = maxSize;
        }

        synchronized void insert() throws InterruptedException {
                   //only here for testing don't want it to runaway and 
                    //memory leak, only testing first 100 samples.
            if (n == maxSize)
                wait();
            System.out.println("Producer: " + n++);
            for (Consumer c : consumers) {
                c.queue.add("Message " + n);
            }
        }

    }

    public static void main(String[] args) {
        TestProducerConsumers pc = new TestProducerConsumers(100);

    }
}
0
додано
НЕ використовуйте ці в той час (...) цикли в споживача для зайнятого очікування! Це справді погане кодування. блокована черга оброблятиме все це для вас, включаючи wait() ing в insert() . (Де все-таки відповідний виклик notify() "?
додано Автор JimmyB, джерело
Тож, що з цим "якщо (n == maxSize) чекати ();"?
додано Автор JimmyB, джерело
Вам не потрібно повідомляти про вставках, оскільки споживачі не використовують очікування() "Поки існують для тестування". Я кинув кілька сну замість того, щоб чекати споживача, тому що мої повідомлення не потрібні в режимі реального часу.
додано Автор JPM, джерело
Знову тестувало лише 100 повідомлень
додано Автор JPM, джерело
ІТ КПІ - Java
ІТ КПІ - Java
436 учасників

android_jobs_ua
android_jobs_ua
120 учасників

Публикуем вакансии и запросы на поиск работы по направлению Android. Здесь всё: full-time, part-time, remote и разовые подработки.

Mobile Dev Jobs UA
Mobile Dev Jobs UA
20 учасників

Публикуем вакансии и запросы на поиск работы по направлению iOS, Android, Xamarin, RN и т.д.