Apollo Graphql and Kotlin: create a back-to-back subscription system

What is Apollo ?

The strength of Apollo Graphql is, above all, the standardization of your API, its self-documentation and its flexibility in the resolution of the answers, especially thanks to the schema system and test tools like playground.

Why back to back?

I recently needed to implement a push server for a Kotlin client using the bot TOCK framework. So my client is a Kotlin server that pushes on Facebook Messenger.

Architecture

A WebSocket between servers? This is not the case of conventional use, but it is a way to share the push server with Android or Web clients. It is moreover the only option for a Facebook Messenger client bot, because we have to handle the connection server side.

For more intensive and industrialized use, there is also a Node.js Apollo Redis package to replace the socket between the 2 servers.

Server node

Schéma Graphql

const typeDefs = `
type Subscription {
id_user: ID!
data: [Message]!
}
type Message {
title: String!
message: String!
}
input MessageInput {
title: String!
message: String!
}
type Query {
subscriptions: [Subscription]
}
type Mutation {
addSubscription(id_user: ID!, data: [MessageInput]!): Subscription
updateSubscription(id_user: ID!, data: [MessageInput]!): Subscription
deleteSubscription(id_user: ID!): Subscription
}
type Subscription {
SubscriptionUpdated: Subscription
}
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
`;
module.exports = {
typeDefs
};

Resolvers

const {PubSub} = require('graphql-yoga');
const pubsub = new PubSub();
const resolvers = {
Query: {
subscriptions: () => {
return service.subscriptions()
},
},
Mutation: {
addSubscription: (root, args) => {
let data = {
data: args.data
};
service.creatSubscription( args.id_user, data );
data.id_user = args.id_user;
return data;
},
updateSubscription: (root, args) => {
let data = {
data: args.data
};
service.updateSubscription( args.id_user, data );
data.id_user = args.id_user;
pubsub.publish('SubscriptionUpdated', {SubscriptionUpdated: data});
console.log('Data updated', data);
return data;
},
deleteSubscription: (root, args) => {
service.deleteSubscription( args.id_user );
console.log('Data deleted', args.id_user);
return {
id_user: args.id_user
};
}
},
Subscription: {
SubscriptionUpdated: {
subscribe: () => pubsub.asyncIterator('SubscriptionUpdated'),
}
}
};
module.exports = {
resolvers
};

Index Schema

const { typeDefs } = require('./typeDefs');
const { resolvers } = require('./resolvers');
const { makeExecutableSchema } = require('graphql-tools');
const schema = makeExecutableSchema({
typeDefs: typeDefs,
resolvers: resolvers,
});
module.exports = {
schema
};

Index serveur Node.js

const express = require('express');
const bodyParser = require("body-parser");
const service = require('./service');
const { GraphQLServer, PubSub } = require('graphql-yoga');
const schema = require('./index');
const pubsub = new PubSub();
const app = new GraphQLServer(schema);
global.service = service;const options = {
port: process.env.PORT || 3000,
endpoint: '/graphql',
subscriptions: '/subscriptions',
playground: '/playground'
};
app.use(bodyParser.json());app.start(options, ({ port }) =>
console.log('App listening on port ' + options.port + '!')
);

Client Kotlin Server

Project Android

You can create or use a Gradle configuration in your project. I opted for the easy way, which is basically following the doc of Apollo Android with an empty Android project and Android Studio.

Schema.json is the result of an introspection query with apollo-codegen :

apollo-codegen download-schema http://localhost:8080/graphql --output schema.json

Other files must contain your query / mutation / subscriptions in the query client format :

query Subscriptions {
subscriptions {
id_user
data{code, ,title, message}
}
}
mutation Subscription ($id_user: ID!, $data: [MessageInput]!) {
addSubscription (
id_user: $id_user,
data: $data,
) {
id_user,
data{code, ,title, message}
}}
subscription SubscriptionUpdated {
SubscriptionUpdated {
id_user
data{code, ,title, message}
}
}

You need to modify your Gradle files to import Apollo dependencies.

In the global Gradle file :

dependencies {
classpath 'com.android.tools.build:gradle:3.0.1'
classpath 'com.apollographql.apollo:apollo-gradle-plugin:0.4.4'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}

In the app’s Gradle file :

apply plugin: 'com.android.application'apply plugin: 'com.apollographql.android'apply plugin: 'kotlin-android'apply plugin: 'kotlin-android-extensions'

The code is generated after a Gradle build in app/build/generated/source/apollo

Generated code

Apollo Instance and WebSocket

Apollo client code :

This code is necessary to create instances of the different generated services (Client Http, WebSocket). We can see the url of our Node.js server :

import com.apollographql.apollo.ApolloCall
import com.apollographql.apollo.ApolloClient
import com.apollographql.apollo.ApolloSubscriptionCall
import com.apollographql.apollo.rx2.Rx2Apollo
import com.apollographql.apollo.subscription.WebSocketSubscriptionTransport
import fr.client.apollo.mutations.SubscriptionMutation
import fr.client.apollo.queries.SubscriptionsQuery
import fr.client.apollo.subscriptions.SubscriptionUpdatedSubscription
import fr.apollo.type.MessageInput
import okhttp3.OkHttpClient
import okhttp3.logging.HttpLoggingInterceptor
import java.util.concurrent.TimeUnit
object ApolloClient { private val BASE_URL_GRAPHQL = "http://localhost:3000/graphql"
private val BASE_URL_SUBSCRIPTIONS = "ws://localhost:3000/subscriptions"
private val apolloClient: ApolloClient
private val subscriptionQueryClient: SubscriptionsQuery
private val subscriptionSubscriptionClient: SubscriptionUpdatedSubscription
init {
val logging = HttpLoggingInterceptor()
logging.setLevel(HttpLoggingInterceptor.Level.BODY)
val okHttpClient = OkHttpClient.Builder()
.addInterceptor(logging)
.pingInterval(30, TimeUnit.SECONDS)
.build()
apolloClient = ApolloClient.builder()
.serverUrl(BASE_URL_GRAPHQL)
.okHttpClient(okHttpClient)
.subscriptionTransportFactory(WebSocketSubscriptionTransport.Factory(BASE_URL_SUBSCRIPTIONS, okHttpClient))
.build()
subscriptionQueryClient = SubscriptionsQuery.builder().build()
subscriptionSubscriptionClient = SubscriptionUpdatedSubscription.builder().build()
val observer = Rx2Apollo.from(getSubscriptionSubscriptionCall())
observer.subscribeWith(SubscriptionSubscriber ())
}
fun getApolloClient(): ApolloClient {
return apolloClient
}
fun getSubscriptionQueryClient(): SubscriptionsQuery {
return subscriptionQueryClient
}
fun getSubscriptionMutationClient(id_user: String, data: MutableList<MessageInput>): SubscriptionMutation {
val builder = SubscriptionMutation.builder()
builder.id_user(id_user)
builder.data(data)
return builder.build()
}
fun getSubscriptionSubscriptionClient(): SubscriptionUpdatedSubscription {
return subscriptionSubscriptionClient
}
fun getSubscriptionQueryCall(): ApolloCall<SubscriptionsQuery.Data> {
return apolloClient.query(subscriptionQueryClient)
}
fun getSubscriptionMutationCall(mutationBuilded: SubscriptionMutation): ApolloCall<SubscriptionMutation.Data> {
return apolloClient.mutate(mutationBuilded)
}
fun getSubscriptionSubscriptionCall(): ApolloSubscriptionCall<SubscriptionUpdatedSubscription.Data> {
return apolloClient.subscribe(subscriptionSubscriptionClient)
}
}

Class WebSocket :

This is where the pushes in the socket are retrieved :

import com.apollographql.apollo.api.Response
import fr.client.apollo.subscriptions.SubscriptionUpdatedSubscription
import io.reactivex.subscribers.DisposableSubscriber
import mu.KotlinLogging
class SubscriptionSubscriber : DisposableSubscriber<Response<SubscriptionUpdatedSubscription.Data>>() { val logger = KotlinLogging.logger {} override fun onNext(subscriptionUpdated: Response<SubscriptionUpdatedSubscription.Data>) { logger.info { "Value received" + subscriptionUpdated.data()?.SubscriptionUpdated().toString() }
override fun onError(t: Throwable) {
logger.info { "Error web socket " + t }
}
override fun onComplete() {
logger.info { "Complete Called "}
}
}

Example of a socket update with a push to Facebook Messenger and bot TOCK framework code :

import com.apollographql.apollo.api.Response
import fr.vsct.tock.bot.connector.messenger.MessengerConnector
import fr.vsct.tock.bot.connector.messenger.model.send.*
import fr.vsct.tock.bot.engine.action.ActionMetadata
import fr.vsct.tock.bot.engine.action.ActionNotificationType
import fr.vsct.tock.bot.engine.action.ActionPriority
import fr.vsct.tock.bot.engine.action.SendSentence
import fr.vsct.tock.bot.engine.user.PlayerId
import fr.vsct.tock.bot.engine.user.PlayerType
import fr.vsct.tock.bot.open.data.MessengerConfiguration.pageId
import fr.client.apollo.subscriptions.SubscriptionUpdatedSubscription
import fr.vsct.tock.bot.open.data.openBot
import io.reactivex.subscribers.DisposableSubscriber
import mu.KotlinLogging
class SubscriptionSubscriber : DisposableSubscriber<Response<SubscriptionUpdatedSubscription.Data>>() { val logger = KotlinLogging.logger {} override fun onNext(subscriptionUpdated: Response<SubscriptionUpdatedSubscription.Data>) { logger.info { "Value received in socket" } var id_user = subscriptionUpdated.data()?.SubscriptionUpdated()!!.id_user()
val data = subscriptionUpdated.data()?.SubscriptionUpdated()!!.data()
logger.info { "Push data to user $id_user" }
logger.info { "With messages $data" }
if(!data.isEmpty()){
val messengerConnector = MessengerConnector.getConnectorByPageId(pageId)!!
val sentence1 = SendSentence(
PlayerId(openBot.botId, PlayerType.bot),
pageId,
PlayerId(id_user, PlayerType.user),
"Re bonjour :) j'ai une information urgente à vous communiquer :",
mutableListOf(),
metadata = ActionMetadata(
priority = ActionPriority.high,
notificationType = ActionNotificationType.transportationUpdate
)
)
val sentence2 = SendSentence(
PlayerId(openBot.botId, PlayerType.bot),
pageId,
PlayerId(id_user, PlayerType.user),
null,
mutableListOf(
AttachmentMessage(
Attachment(
AttachmentType.template,
GenericPayload(
data.distinctBy { it.message() }.take(4).map { message ->
Element(
message.title(),
fr.vsct.tock.bot.open.data.OpenDataConfiguration.image,
message.message()
)
}
)
)
)
),
metadata = ActionMetadata(
priority = ActionPriority.high,
notificationType = ActionNotificationType.transportationUpdate
)
)
try {
messengerConnector.sendEvent(sentence1)
messengerConnector.sendEvent(sentence2)
} catch (e: Exception) {
logger.info { "Fail to push data to user $e" }
}
}
}
override fun onError(t: Throwable) {
logger.info { "Error web socket " + t }
}
override fun onComplete() {
logger.info { "Complete Called "}
}
}

Manual tests with playground

There is also a mock mode for GraphQl that can be used for your unit tests with for example Mocha and Chai.

Deployment

That’s why I deployed my app on Heroku. WebSockets are managed there. Just think of pinging the socket regularly, which turns off every 55s by default :

val okHttpClient = OkHttpClient.Builder()
.addInterceptor(logging)
.pingInterval(30, TimeUnit.SECONDS)
.build()

Theoretical limits

To overcome this type of limit as indicated above, the best is to change architecture on a back-to-back case to a queuing type redis solution: Apollo Redis

Amazon implementation

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Adrien Body

Staff Engineer @SNCFConnect. I love discovering new technologies.