概要
WWDJAPANというメディアのお仕事を手伝ってまして、そこでリアルタイムにサイトへのアクセス数をカウントしてランキングを表示する仕組みをAWSで作った(まだ、サイト側には公開していませんが)のでそれの仕組みとかコードを共有したいと思います。
ちなみにGoogle AnalyticsのリアルタイムAPIを使う方法もあるのですが、APIの使用申請をGoogleに送ったところ、何故かスルーされてしまったので独自に作る方針としました。
アーキテクチャ
APIとして以下の2つ必要になります。そしてバックエンドにはKinesisをおいてアクセスデータをバッファさせる仕組みです。
– サイトのPVを収集するためのAPI
– ランキングの結果を取得するAPI(WebSocketにしても面白いと思う)
PV収集用API
API GatewayとKinesis StreamをAWS Service Proxyの機能を使って直接接続します。
このAPIはサイトの記事詳細ページに埋め込んで、ページが表示するたびにその記事IDをRanking Consumer Streamに対して記事IDをPutします。
ここのシャード数はサイトからの同時接続数によって変わってくるので、実際に埋め込んでみて調整するのが良いでしょう。
そして、Ranking Consumerの背後にいるLambdaでRanking Collector Streamにシャード毎のデータをまとめてPutします。
Ranking Collector Streamのシャードは1本固定にしています。これはランキング取得APIから1本のシャードに対してデータをGetすることで集計をしやすくするという意図があります。また、Ranking Consumer Streamのシャードを増減させてもLambda自体のコードには手を加えなくてよいというメリットもあります。
ランキング取得API
GetShardIteratorとGetRecordsオペレーションで現在時刻から数分前までにputされたデータをKinesisから取り出します。
以下のように、ShardIteratorType
にAT_TIMESTAMP
を指定すると指定の時刻以降にシャードにputされたデータのみを取得してくれるので、これを現在時刻から5分前までなどに指定することでリアルタイム性を出しています。
await aws.request('Kinesis', 'getShardIterator', {
ShardId: 'shardId-000000000000',
ShardIteratorType: 'AT_TIMESTAMP',
StreamName: streamName,
Timestamp: dt // ここに指定したタイムスタンプ以降にシャードにputされたデータのみを取得する
})
そして以下のようなランキングのJsonを生成してAPI に返却します。ランキング結果
[
{
"post_id": 795075,
"count": 58
},
{
"post_id": 794978,
"count": 43
},
{
"post_id": 794963,
"count": 32
},
{
"post_id": 754499,
"count": 25
},
{
"post_id": 795236,
"count": 20
},
{
"post_id": 795228,
"count": 19
},
{
"post_id": 795939,
"count": 18
},
{
"post_id": 795021,
"count": 18
},
{
"post_id": 795841,
"count": 16
},
{
"post_id": 793589,
"count": 15
}
]
構成管理
Serverless Frameworkを使用しています。以下がそのserverless.yamlファイルの全貌です。serverless.yml
service: realtime-ranking
provider:
name: aws
runtime: nodejs8.10
stage: dev
region: ap-northeast-1
functions:
rankingCollector:
handler: lambda/apiGateway/rankingCollector.handler
events:
- http:
path: /ranking
method: get
cors: true
environment:
MINUITES_OF_RANKING_TERM: 10
COUNT_OF_RANKING: 10
COLLECTOR_STREAM_NAME: { Ref: 'RankingCollectorStream' }
iamRoleStatements:
- Effect: Allow
Action:
- 'kinesis:GetRecords'
- 'kinesis:GetShardIterator'
Resource: {"Fn::GetAtt":[ "RankingCollectorStream", "Arn" ]}
rankingConsumer:
handler: lambda/kinesisStreams/rankingConsumer.handler
events:
- stream:
type: kinesis
batchSize: 1000
arn:
Fn::GetAtt:
- RankingConsumerStream
- Arn
environment:
COLLECTOR_STREAM_NAME: { Ref: 'RankingCollectorStream' }
iamRoleStatements:
- Effect: Allow
Action:
- 'kinesis:PutRecord'
Resource: {"Fn::GetAtt":[ "RankingCollectorStream", "Arn" ]}
custom:
stage: ${opt:stage, self:provider.stage}
apiGatewayServiceProxies:
- kinesis:
path: /ranking
method: post
streamName: { Ref: 'RankingConsumerStream' }
cors: true
resources:
Resources:
RankingConsumerStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 3
Name: ${self:service}-${self:custom.stage}-ranking-consumer
RankingCollectorStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
Name: ${self:service}-${self:custom.stage}-ranking-collector
plugins:
- serverless-apigateway-service-proxy
- serverless-iam-roles-per-function
今回、KinesisとAPI GatewayをLambdaを挟むことなく直接接続するために、Serverless API Gateway Service ProxyというServerless Frameworkのプラグインを開発して公開しました。
今までであれば、これを実現するために膨大なCloudFormationを記述しないといけなかったのですが、以下のようにたった数行で実現できてしまいます。是非使う機会などあれば、GitHubにスター付けてくれたりフィードバックもらえると嬉しいです。
custom:
apiGatewayServiceProxies:
- kinesis:
path: /ranking
method: post
streamName: { Ref: 'RankingConsumerStream' }
cors: true
というわけで、みなさまよいリアルタイムランキングライフを!