Перейти к содержимому

Асинхронное программирование в Node.js: Ожидание нескольких событий сразу

04/02/2010

Постановка задачи

Представим себе такую ситуацию. Нам надо сделать поиск в базе Tokyo Tyrant (или любой другой) и получить найденные значения в виде JavaScript-объектов. Проблема состоит в том, что получение объекта из Tokyo Tyrant происходит с коллбеком. Т.е., у нас на руках после поиска оказывается несколько коллбеков — по числу ожидаемых объектов:

var tyrant = require('./tyrant/tyrant');
var sys = require('sys');

tyrant.connect();

tyrant.addListener('connect', function() {
    tyrant.search(tyrant.is('type', 'blog'), tyrant.sort('time', 'desc')).addCallback(function(value) {

        var posts = [];
        for (item in value) {
            var page_id = value[item];
            tyrant.get(page_id).addCallback(function(raw_item) {
                sys.puts('Got record #' + page_id + ': ' + JSON.stringify(raw_item));
            });
        }
    });
});

Изначальное моё решение было простым (и неправильным): я объявил массив posts и наполнял его прямо из promises, ожидая их завершения с помощью wait(). Это работало, но, во-первых, противоречило идее асинхронной работы программы, и во-вторых, сломалось при первой же проверке на прочность с помощью ab: я получал кучу сообщений WARNING: promise.wait() is being called too often и итоговый сегфолт. Посмотрев в группе по этому поводу, нашёл только призыв Райана не пользоваться wait() вообще. Что ж, нужен был другой подход.

Callback-Комбо

Нам нужно каким то образом собрать полученные объекты и вызвать с ними функцию только когда все записи будут получены. Сама Node.js не предоставляет стандартных средств для такой задачи, но Tim Caswell написал маленькую функцию, позволяющую довольно просто собирать несколько событий в один event.

// combo library 
    function Combo(callback) { 
      this.callback = callback; 
      this.items = 0; 
      this.results = []; 
    } 
    Combo.prototype = { 
      add: function () { 
        var self = this; 
        this.items++; 
        return function () { 
          self.check(self.items - 1, arguments); 
        }; 
      }, 
      check: function (id, arguments) { 
        this.results[id] = arguments; 
        this.items--; 
        if (this.items == 0) { 
          this.callback.apply(this, this.results); 
        } 
      } 
    }; 


    // Пример использования
    var both = new Combo(function () { 
      // Сперва объявляем callback. Он просто выведет переданные аргументы
      puts(inspect(arguments)); 
    });
    // Ожидаем первое событие
    setTimeout(both.add(), 100);
    // ...и одновременно ожидаем второе 
    setTimeout(both.add(), 50); 

Я использовал её для сбора записей, получаемых из Tokyo Tyrant, но для этого мне пришлось её слегка изменить. Тим использовал в методе check функцию callback.apply. Т.к. у нас к моменту запуска коллбека будет массив значений, apply этот массив разделит на отдельные аргументы. То есть, если мы создадим Combo с функцией gather(), и получим пять результатов, callback.apply вызовет gather с пятью параметрами, которые нам придётся перебирать в коде. Поэтому я заменил callback.apply на callback.call:

function Combo(callback) {
  this.callback = callback;
  this.items = 0;
  this.results = [];
}
Combo.prototype = {
  add: function () {
    sys.puts('Adding promise to combo');
    var self = this;
    this.items++;
    return function () {
      sys.puts('Combo part fired with ' + (self.items - 1) + ' id, arguments is ' + JSON.stringify(arguments));
      self.check(self.items - 1, arguments);
    };
  },
  check: function (id, arguments_in) {
    this.results[id] = arguments_in;
    this.items--;
    if (this.items == 0) {
      sys.puts('Combo is approaching the end, resultset count is ' + this.results.length);
      this.callback.call(this, this.results);
    }
  }
};

var tyrant = require('./tyrant/tyrant');
var sys = require('sys');

tyrant.connect();

tyrant.addListener('connect', function() {
    tyrant.search(tyrant.is('type', 'blog'), tyrant.sort('time', 'desc')).addCallback(function(value) {

        var gatherer = new Combo(function(posts) {
            sys.puts('Got all records :' + JSON.stringify(posts));
        });

        for (item in value) {
            var page_id = value[item];
            tyrant.get(page_id).addCallback(gatherer.add());
        }
    });
});

После этой модификации всё отлично работает, проверка с помощью ab больше не убивает сайт и время выполнения запроса довольно стабильно.

10 комментариев
  1. Вообще конкретно для этого случая лучше использовать getlist(), чтобы получить список объектов, айдишники которых были найдены в search().

    Что касается функкции Combo: меня смущает setTimeout(). Зачем он там?

    • Если имеется в виду второй листинг, то это пример использования. Ставятся два таймаута, и Combo вызывается когда завершатся оба.

      • Про таймаут — не заметил комментарий. Прошу прощения.

        • Да там и комментарий сначала был невнятный. Задокументировал пример получше, спасибо что указали 🙂

  2. Да, кстати, как сильно у вас отличаются результаты ab, для случаев, когда поиск в Tyrant используется с сортировкой и без нее?

    • Естественно, в обоих случаях на поле, по которому происходит сортировка, должен быть установлен индекс.

    • Хм, не мерял. Виртуальная машина у меня на работе, завтра замеряю и напишу. Я всё равно собирался мерять быстродействие шаблонизатора.

  3. zit permalink

    прикольная штука для «нумерованных» событий, возьму на заметку. я вот такой пользуюсь http://ru-js-code-utils.googlecode.com/files/_on.js — суть несколько другая, отслеживание нескольких разных событий по строчкам.

Trackbacks & Pingbacks

  1. Асинхронное программирование в Node.js: Ожидание нескольких событий сразу « nodeJS

Оставьте комментарий