余白

https://blog.lacolaco.net/ に移転しました

C#でTwitterアプリを作る 第3回 Streaming

第2回ではタイムラインの取得やツイートの投稿について紹介しました。
今回はツイッタークライアントを作るならもはや必須のStreamingAPIをCoreTweetから利用してみます。

ところで、第0回でも触れましたが、CoreTweetにはStreamingAPIの非同期処理をReactiveExtentionsで記述できる、CoreTweet.Streaming.Rxという拡張ライブラリが存在します。
もちろんCoreTweet.Streaming.Rxを用いなくてもLINQを使って同期的にStreamingAPIを使えるようになっているので、どちらも紹介します。

CoreTweetのみでStreaming

CoreTweet標準のStreamingApiクラスは、StreamingApi.StartStreamメソッドで接続します。

var stream = tokens.Streaming.StartStream(CoreTweet.Streaming.StreamingType.User,
                new StreamingParameters(replies => "all"));

StartStreamの引数は

  • StreamingType( User, Public, Sample etc.)
  • StreamingParameters

の2つですが、第2引数は省略可能です。
StreamingParameters第1回に紹介したパラメータと同様に、複数の方法でパラメータを指定できます。

ところで、StartStreamの戻り値の型は何かというと、IEnumerable<StreamingMessage>です。つまり、ここから先はLINQで記述できるということです。嬉しいですね!
UserStreamに流れてきたツイートとイベントをコンソールに出力するサンプルはこちら

foreach(var message in stream)
{
    if(message is StatusMessage)
    {
        var status = (message as StatusMessage).Status;
        Console.WriteLine(string.Format("{0}:{1}", status.User.ScreenName, status.Text));
    }
    else if(message is EventMessage)
    {
        var ev = message as EventMessage;
        Console.WriteLine(string.Format("{0}:{1}->{2}",
            ev.Event, ev.Source.ScreenName, ev.Target.ScreenName));
    }
}

IE<Status>で返ってくるREST APIのタイムラインとほとんど同じように扱えるため、Streamingであるということをあまり意識せずにプログラミングできます。

しかしこのコードでは同期処理のため、このforeachから先に進むことはできません。非同期処理を行うには自前でTaskを使うなど、非同期用のコードを書く必要があります。

次に紹介するCoreTweet.Streaming.Rxを使えばRxの力を使って、非同期処理を簡単に記述できます

CoreTweet.Streaming.Rxを使う

*CoreTweet.Streaming.Rxを使うにはまず以下のimportを増やす必要があります

using System.Reactive.Linq;
using CoreTweet.Streaming;
using CoreTweet.Streaming.Reactive;
using System.Threading;

CoreTweet単体の時はIEnumerable<StreamingMessage>を生成しました。
Rxを使うので、今回はIObservable<StreamingMessage>発行します。

//publish stream
var streamRx = tokens.Streaming.StartObservableStream(StreamingType.User).Publish();

StartObservableStreamに渡す引数はStartStreamと同じです。Publish()IO<StreamingStatus>が発行されます。
発行の次は非同期処理の内容を記述していきます。この後はCoreTweetというよりもほとんどRxの機能なので、詳しく知りたい方はReactive Extentionsでググって見てください

//action
Action<StatusMessage> printStatus = (message) =>
{
    var status = (message as StatusMessage).Status;
    Console.WriteLine(string.Format("{0}:{1}", status.User.ScreenName, status.Text));
};

//subscribe actions for event
streamRx.OfType<StatusMessage>().Subscribe(printStatus);
streamRx.OfType<StatusMessage>().Subscribe(printStatus, onCompleted: () => Console.WriteLine("completed"));
streamRx.OfType<StatusMessage>().Subscribe(printStatus, onError: exception => Console.WriteLine(exception.ToString()));

同じようなSubscribeを何度もしてますが、オーバーロードを紹介しているだけで、本当は1つです。
OfType<StatusMessage>で受理するStreamingMessageの型を選んで、Subscribeで非同期に行う処理を記述します。
処理を書き終わったら接続します

//connect stream
var connection = streamRx.Connect();

Connect()の戻り値はIDisposableです。このconnectionオブジェクトがDisposeされると、Streaming接続も自動的に切断されます。
次のコードでは、接続後30秒経つと切断されます。

//connect stream
var connection = streamRx.Connect();
//wait
Thread.Sleep(30000);
//close connection
connection.Dispose();

以上でCoreTweetのStreamingAPI関連の機能の紹介が終わりました。
次回はリツイートやお気に入り追加など、各種APIの使い方をまとめて紹介します。