Skip to main content

Source Connector

There are only TWO requirements when it comes to writing a valid Nakji Network Source Connector:

  1. Write protobuf (proto3) messages to Nakji Message Queue (Kafka) using transactions.
  2. Manifest file that contains connector metadata (eg title, author, version).

There are no DSLs (domain specific languages) you have to learn, no weird structures you have to abide by, and no awkward terminology you have to remember.

Just Send Data.


This is an example of a barebones Ethereum connector which ingests Block and Transaction data into Nakji, written in Golang.

Check out the repo if you prefer to dive directly in the code :)


This is beta software! Bug reports and suggestions are highly encouraged :)


More details in the Readme


First, a protobuf definition file needs to be created. Protobuf is the platform-neutral data format for almost all the data in motion within Nakji, whether it's into or out of the Nakji Message Queue. Make sure to use Version 3 of the Protobuf spec (proto3 documentation).

syntax = "proto3";

import "google/protobuf/timestamp.proto";

package ethereum;

option go_package = "";

// to convert addresses from bytes to hex address,
message Transaction {
google.protobuf.Timestamp ts = 1; //uint64
bytes From = 2;
string Hash = 3;
double Size = 4;
uint64 AccountNonce = 5; // uint64
uint64 Price = 6; //
uint64 GasLimit = 7; // uint64
bytes Recipient = 8;
uint64 Amount = 9; //
bytes Payload = 10;
uint64 V = 11; //
uint64 R = 12; //
uint64 S = 13; //

message Block {
google.protobuf.Timestamp ts = 1; //uint64
string Hash = 2;
uint64 Difficulty = 3; //bigint
uint64 Number = 4; //bigint
uint64 GasLimit = 5; // uint64
uint64 GasUsed = 6; // uint64
uint64 Nonce = 7; //[8]byte .Uint64()

The proto definitions can be compiled by the protoc compiler into any language it supports (currently 7) to enable seamless cross-platform/cross-service communication. For purposes of this example, follow the Protobuf for Golang guide here to get set up.


Create a file named manifest.yaml in the project root with the following fields.

package: ethereum
owner: nakji
version: 0.0.0


The cmd/ethereum/main.go file prepares all the config variables and Kafka connections.

// This connector ingests real time data from any EVM compatible chain.
package main

import (


func init() {
conf.SetDefault("rpcs.ethereum.full", []string{"wss://<api_key>"})

func main() {
c := connector.NewConnector()

// Get config variables using functions from Viper (
RPCURLs := c.Config.GetStringSlice("rpcs.ethereum.full")

// For the purposes of this example, we'll just grab one of the websocket RPCs
var RPCURL string
for _, u := range RPCURLs {
if strings.HasPrefix(u, "ws") {

ethConnector := ethereum.EthereumConnector{
Connector: c,


Main connector code

The ethereum.go file connects to the data source (in this case, an Ethereum RPC). When data is received, it is cleaned and shaped into the Protobuf format defined above, and sent to the correct Topics in the Nakji Message Queue (Kafka).

// ethereum package follows to
// subscribe to new Blocks and Transactions and writes the results to Nakji.
package ethereum

import (


type EthereumConnector struct {
*connector.Connector // embed Nakji connector.Connector into your custom connector to get access to all its methods

RPCURL string

func (c *EthereumConnector) Start() {
// Listen for interrupt in order to cleanly close connections later
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)

// connect to Ethereum RPC websockets
log.Info().Str("url", c.RPCURL).Msg("connecting to Ethereum RPC")
client, err := ethclient.DialContext(context.Background(), c.RPCURL)
if err != nil {
log.Fatal().Err(err).Msg("Ethereum RPC connection error")
defer client.Close()

// Subscribe to headers
headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(context.Background(), headers)
if err != nil {

// Main loop to process errors and headers
go func() {
for {
select {
case err := <-sub.Err():
case header := <-headers:
block, err := client.BlockByHash(context.Background(), header.Hash())
if err != nil {
log.Fatal().Err(err).Msg("BlockByHash error")


// EthBlock -> Block -> Protobuf -> kafka
var blockData Block
err = c.ProduceMessage("ethereum", "ethereum", &blockData)
if err != nil {
log.Error().Err(err).Msg("Kafka write proto")

// EthTransaction -> Transaction -> Protobuf -> Kafka
for _, tx := range block.Transactions() {
txData := Transaction{}
txData.Ts = blockData.Ts // Timestamp isn't in the raw transaction from geth

err = c.ProduceMessage("ethereum", "ethereum", &txData)
if err != nil {
log.Error().Err(err).Msg("Kafka write proto")

// Commit Kafka Transaction
err = c.Producer.CommitTransaction(nil)
if err != nil {
log.Error().Err(err).Msg("Processor: Failed to commit transaction")

err = c.Producer.AbortTransaction(nil)
if err != nil {
// Start a new transaction
err = c.BeginTransaction()
if err != nil {


for {
select {
case <-interrupt:

// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.

func PrintBlock(block *types.Block) {
fmt.Printf("hash: %s\n", block.Hash().Hex()) // 0xbc10defa8dda384c96a17640d84de5578804945d347072e091b4e5f390ddea7f
fmt.Printf("num: %v\n", block.Number().Uint64()) // 3477413
fmt.Printf("time: %v\n", block.Time()) // 1529525947
fmt.Printf("nonce: %v\n", block.Nonce()) // 130524141876765836
fmt.Printf("#tx: %v\n", len(block.Transactions())) // 7
fmt.Printf("gaslim: %v\n", block.GasLimit()) // 1529525947
fmt.Printf("gasuse: %v\n", block.GasUsed()) // 1529525947
fmt.Printf("diff: %v\n", block.Difficulty()) // 1529525947

The full code for this example is here and contains some of the helper functions used above. You can find other examples here.


In some cases, a source connector may use input data from the Nakji message queue instead of an external source. This reduces the need for every connector to make a new RPC request, at the cost of slightly increasing latency. For example:


This is a rare use case.


Feel free to reach out to the team if you run into issues or need assistance!