Integrating Crypto Market Data From Multiple Sources

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • ZYKJ
    • Aug 2024
    • 4

    Integrating Crypto Market Data From Multiple Sources

    Cryptocurrency trading professionals typically use Python, Java, Rust, or C++ for exchange API access. Single-source data access presents inherent limitations, such as data loss from server downtime, delayed updates, and limited data quality and refresh rates, which can potentially impact trading decisions.

    This article introduces a DolphinDB solution to access market streams (with Binance Exchange as an example) through both WebSocket and REST APIs. Performance test shows that this integrated approach enhances continuous data access and comprehensive coverage and improves data quality by providing more timely and frequent market updates. By implementing higher frequency data refreshes within shorter intervals, traders can achieve superior market responsiveness and make more informed decisions.

    1. Background
    Binance offers two ways for accessing the market streams:

    REST API: Suitable for retrieving static data but requires polling for real-time updates.
    WebSocket API: Designed for accessing real-time market streams. By maintaining a persistent connection, it enables low-latency retrieval of the latest data.
    Binance provides multiple base endpoints, each with distinct performance and stability characteristics . We recommend that users test the performance and stability of each base endpoints on the designated server in their specific business context and determine the optimal number of data sources and retrieval methods based on their needs. The proposed approach combines the strengths of both methods to enhance performance and stability by subscribing to data on different base URLs using the same account and setting different processing frequencies.

    2. Implementation Overview
    The integrated market data solution comprises the following components.


    Accessing Market Streams from Binance Exchange

    For each cryptocurrency pair, market data is accessed from Binance via both WebSocket API (with subscription frequency at 100 ms) and REST API (with polling interval of 50 ms).

    Processing Data from WebSocket API

    Subscribing to the data from WebSocket API, and writing it to a shared stream table.
    Persisting the stream table to a DFS partitioned table.
    Processing Data from REST API

    Setting up HTTP requests to fetch data from REST API.
    Writing the data to a shared stream table.
    Persisting the stream table to a DFS partitioned table.
    Cleaning and Integrating Received Data

    Creating a shared dictionary to maintain the latest timestamp for each trading pair.
    Defining rules for filtering data with latest timestamps, and integrating selected streams into a shared stream table.
    Upon receiving new data, comparing timestamps of incoming data with those in table to ensure only the latest data is written.
    Persisting the stream table to a DFS partitioned table.
    3. Accessing Market Streams
    This chapter outlines the steps of accessing market streams from Binance’s WebSocket and REST APIs using DolphinDB WebSocket and HttpClient Plugins.

    3.1 Accessing Data From WebSocket API
    We establish a WebSocket subscription task using the DolphinDB WebSocket plugin to access real-time market depth data from Binance WebSocket API.

    Step 1: Install and load the DolphinDB WebSocket plugin. This step can be skipped if the plugin is already loaded.

    Code:
    installPlugin("WebSocket")
    loadPlugin("WebSocket")
    Step 2: Create a shared stream table wssDepthST and persist it to disk.

    Code:
    colNames = `type`eventTime`transactionTime`code`firstId`lastId`finalId`bidPrice`bidQty`askPrice`askQty`currentTime
    colTypes = [SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], TIMESTAMP]
    enableTableShareAndPersistence(table=streamTable(10000:0, colNames, colTypes), tableName=`wssDepthST, cacheSize=12000)
    Step 3: Define functions for handling market data.

    The market depth data accessed through WebSocket API is formatted as follows:

    Code:
    {"e": "depthUpdate",  "E": 1571889248277, "T": 1571889248276, "s": "BTCUSDT", "U": 390497796, "u": 390497878, "pu": 390497794,
     "b": [["7403.89", "0.002"], ["7403.90", "3.906"], ["7404.00", "1.428"] ,["7404.85", "5.239"], ["7405.43","2.562"]],
     "a": [["7405.96","3.340" ], ["7406.63", "4.525"], ["7407.08", "2.475"], ["7407.15", "4.800"], ["7407.20","0.175"]]}
    The following script defines functions for processing the incoming data in DolphinDB.

    Code:
    // Define parseDepth for parsing market depth data
    def parseDepth(rawData){
        rawDataDict = parseExpr(rawData).eval().data
        if (rawDataDict["e"]=="depthUpdate"){
            // Process bid data
            bTransposed = rawDataDict.b.double().matrix(b).transpose()
            bidPrice = bTransposed[0].enlist()
            bidQty = bTransposed[1].enlist()
            // Process ask data
            aTransposed = rawDataDict.a.double().matrix(a).transpose()
            askPrice = aTransposed[0].enlist()
            askQty = aTransposed[1].enlist()
            // Extract other relevant fields
            type = rawDataDict["e"]
            eventTime = timestamp(rawDataDict["E"])
            transactionTime = timestamp(rawDataDict["T"])
            code = rawDataDict["s"]
            firstId = rawDataDict["U"]
            lastId = rawDataDict["u"]
            finalId = rawDataDict["pu"]
            currentTime = gmtime(now())
            // Construct a table
            return table(typ, eventTime, transactionTime, code, firstId, 
              lastId, finalId, bidPrice, bidQty, askPrice, askQty, currentTime)
        }
    }
    // WebSocket event handlers
    def onOpen(ws){
        writeLog("WebSocket opened to receive data")
    }
    
    def onMessage(mutable streamTable, ws, dataTable){
        for (data in dataTable[`msg]){
            res = parseDepth(data)
            streamTable.append!(res)
        }
    }
    
    def onError(ws, error){
        writeLog("WebSocket failed to receive data: " + error.string())
    }
    
    def onClose(ws, statusCode, msg){
        writeLog("Connection is closed, status code: " + statusCode.string() + ", " + 
          msg.string())
    }
    Step 4: Create a WebSocket subscription task.

    Code:
    // Connect to Binance WebSocket API
    url = "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms"
    config = dict(STRING, ANY)
    // Create a WebSocket subscription task
    ws = WebSocket::createSubJob(url, onOpen, onMessage{streamTable=wssDepthST}, onError, onClose, "wssDepth", config)
    Step 5: Manage the subscription.

    After establishing the subscription, use getSubJobStat to view the subscription status:

    Code:
    // Check subscription status
    WebSocket::getSubJobStat()
    To cancel the subscription, use cancelSubJob:

    Code:
    // Cancel subscription
    WebSocket::cancelSubJob("wssDepth")
    For the complete script for Binance WebSocket data access, refer to Appendix binanceDepthWS.

    3.2 Accessing Data From REST API
    We create a task to send periodic HTTP requests through the HttpClient Plugin for accessing the latest market depth data from Binance REST API.

    Step 1: Install and load the DolphinDB HttpClient plugin. This step can be skipped if the plugin is already loaded.

    Code:
    installPlugin("httpClient")
    loadPlugin("httpClient")
    Step 2: Create a shared stream table restDepthST and persist it to disk.

    Code:
    colNames = `eventTime`transactionTime`code`lastUpdateId`bidPrice`bidQty`askPrice`askQty`currentTime
    colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, LONG, DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], TIMESTAMP]
    enableTableShareAndPersistence(table=streamTable(100000:0, colNames, colTypes), tableName=`restDepthST, cacheSize=12000)
    Step 3: Define functions for handling market data.

    The market depth data accessed through REST API is formatted as follows:

    Code:
    {"lastUpdateId": 1027024,"E": 1589436922972,"T": 1589436922959,
     "bids": [["4.00000000", "431.00000000"]],
     "asks": [["4.00000200", "12.00000000"]]}
    Define a getDepth function that sends HTTP requests at regular intervals (every 50 ms) to continuously fetch the latest market data. The retrieved data is parsed and ingested into a stream table.

    Code:
    // Define getDepth to fetch market data
    def getDepth(mutable restDepthST, code, baseUrl){
        do{
            try{
            param = dict(string,string)
            param['symbol'] = code;
            res = httpClient::httpGet(baseUrl,param,10000)
            
            rawDataDict = parseExpr(res.text).eval()
            b = double(rawDataDict.bids)
            bTransposed = matrix(b).transpose()
            bidPrice = bTransposed[0]
            bidQty = bTransposed[1]
            a = double(rawDataDict.asks)
            aTransposed = matrix(a).transpose()
            askPrice = aTransposed[0]
            askQty = aTransposed[1]
            
            eventTime = timestamp(rawDataDict["E"])
            transactionTime = timestamp(rawDataDict["T"])
            lastUpdateId = long(rawDataDict["lastUpdateId"])
            currentTime = gmtime(now())
            
            resTable = table(eventTime as eventTime, transactionTime as transactionTime, 
                code as code, lastUpdateId as lastUpdateId, [bidPrice] as bidPrice, 
                [bidQty] as bidQty, [askPrice] as askPrice, [askQty] as askQty, currentTime as currentTime)
            restDepthST.append!(resTable)
            sleep(50) 
            }
            catch(ex){
                print(ex)
                continue
            }
        }while(true)
    }
    Step 4: Set the base endpoint and submit the job with the name getDepth_BTC.

    Code:
    baseUrl = "https://fapi.binance.com/fapi/v1/depth"
    submitJob("getDepth_BTC","getDepth_BTC", getDepth, restDepthST, "btcusdt", baseUrl)
    Step 5: Manage submitted jobs.

    Code:
    // View recent jobs
    getRecentJobs()
    // View intermediate job information
    getJobMessage("getDepth_BTC")
    // Cancel the job
    cancelJob('getDepth_BTC')
    For the complete script for Binance REST data access, refer to Appendix binanceDepthRES T.

    4. Integrating Multi-Source Data
    The stream tables wssDepthST and restDepthST receive market data from Binance WebSocket and REST APIs. Due to the differences in subscription methods and delays, rules for aligning and cleaning such multi-source market data are required for later data integration. These rules are designed to compare the timestamp of the incoming data with that in table to determine if the data provides the most recent information.

    Specifically, we implement data integration by creating a shared dictionary to store the latest timestamps for each cryptocurrency pair and using a stream table latestDepthST to store the integrated market data. The table subscribes to wssDepthST and restDepthST with handling rules to update records with latest timestamps.

    Step 1: Create a shared dictionary for maintaining the latest timestamps for each trading pair to track the most recent update time.

    Code:
    latestTs=syncDict(SYMBOL,TIMESTAMP)
    latestTs['btcusdt'] = 2024.05.16 06:39:17.513
    Step 2: Set rules for filtering and ingesting data from restDepthST and wssDepthST into table latestDepthST. By extracting data and event times, the rules check if the record has the latest timestamp. If true, the shared dictionary is updated and new records are appended to latestDepthST.

    Code:
    def toMerged(mutable latestTs, routeName, msg){ 
        symbolID = exec code from msg limit 1
        Timets = exec eventTime from msg limit 1
        lts = latestTs[symbolID]
        if(Timets > lts){
            latestTs[symbolID] = Timets
            appendTestTb = select code, eventTime, lastUpdateId, bidPrice, bidQty, askPrice, askQty, currentTime, routeName as dataroute from msg
            objByName("latestDepthST").append!(appendTestTb)
        }
    }
    Step 3: Subscribe to wssDepthST and restDepthST and specify the handler with filtering rules.

    Code:
    subscribeTable(tableName="restDepthST", actionName="restTolive", offset=-1, handler=toMerged{latestTs, 'restapi'}, msgAsTable=true, batchSize=1, throttle=0.001)
    subscribeTable(tableName="wssDepthST", actionName="wssTolive", offset=-1, handler=toMerged{latestTs, 'wssapi'}, msgAsTable=true, batchSize=1, throttle=0.001)
    This method allows subscriptions to multiple sources to be integrated into one table. Rules should be specified based on the format of the obtained market data.

    5. Performance Test
    We test the data coverage of using WebSocket and REST APIs separately and together by evaluating the data volume received within 24 hours. Larger data volume indicates higher refresh rates on average.

    In our test environment, we use two methods to obtain BTCUSDT spot depth data: WebSocket with a 100-ms frequency (maximum available speed) and REST polling with a 50-ms interval. We then filter and insert the latest market data into a table. While the 50-ms update speed is ideal for balancing update frequency and market data acquisition across multiple currency pairs given bandwidth limitations, it’s important to note that the REST polling method actually yields higher data volumes due to its faster 50 ms interval.

    After 24 hours of data acquisition:

    REST polling: 1,383,480
    WebSocket subscription: 644,808
    Duplicate records: 10,020
    The latest market table latestDepthST receives 1,753,067 new records, with the specific composition as follows:


    From the results, we can see that the number of new records added to latestDepthST is greater than the number of records obtained separately through WebSocket and REST. Moreover, a higher proportion of REST data in the table indicates that this multi-source integration approach can increase the frequency and timeliness of data acquisition compared to a single access method. Additionally, since the data obtained from WebSocket and REST APIs are pushed from different servers, this method helps avoid problems caused by a single server failure. This further demonstrates that multi-source market data access can obtain market streams more quickly, reducing the risks associated with network fluctuations and exchange service failures.

    6. Conclusion
    This article introduces a DolphinDB-based solution in multi-source market stream access and integration of cryptocurrency market. By integrating multiple data sources and implementing efficient data cleansing and storage, DolphinDB not only improves the refresh frequency and timeliness of data acquisition but also ensures data continuity and reliability. The test results show that this approach significantly increases the amount of market data obtained and improves the data refresh interval, providing traders with more reliable and timely market data support.

    In summary, DolphinDB’s multi-source market data integration solution significantly enhances the cryptocurrency market data acquisition process, providing traders with a superior framework for strategic trading decisions and activities.

    7. Appendix
    wssDepthST (sample table)
    binanceDepthWS (script for Section 3.1)
    binanceDepthRES T (script for Section 3.2)
    binanceDepthJoi nt (script for Chapter 4)
  • RosaIwant
    New Member
    • Sep 2024
    • 3

    #2
    Looking forward to exploring DolphinDB more based on your insights.

    Comment

    • dracpet
      New Member
      • Jan 2025
      • 1

      #3
      I get an error from parseDepth function: “Cannot recognize the token b”, also noticed that in the return statement type is written as typ. I would be really cool to get this working. Please help 🙏

      Comment

      • KennynotDead
        New Member
        • Aug 2025
        • 1

        #4
        I've had similar issues syncing data across platforms, especially with inconsistent timestamp formats. What helped me was building a lightweight middleware layer that standardizes all incoming data before it hits my database. I use axiom crypto bot to feed the market data into that layer—it handles a good variety of sources, and its output is pretty clean, which saves me from doing too much reformatting myself.

        Comment

        • natashasturrock
          New Member
          • Jul 2025
          • 17

          #5
          For cryptocurrency trading, relying on a single data source can limit your access to timely and accurate market information. To address this, a DolphinDB-based multi-source solution can integrate both Binance WebSocket and REST APIs to improve market data coverage and refresh rates.

          The approach works by:

          - Accessing real-time data via WebSocket at 100 ms intervals and polling REST API at 50 ms intervals.

          - Processing and persisting incoming data into shared stream tables for further analysis.

          - Integrating multi-source data using timestamp-based rules to ensure only the latest market updates are retained.

          Performance tests show that combining WebSocket and REST APIs results in more comprehensive data coverage, higher refresh frequency, and improved timeliness compared to using either source alone. This multi-source setup also mitigates risks from server downtime or network issues.

          In short, DolphinDB’s integration framework provides traders with continuous, reliable, and timely market data, enabling faster and more informed trading decisions.

          Comment

          • janeront
            New Member
            • Dec 2025
            • 3

            #6
            Designed for traders who demand excellence, our site delivers unmatched efficiency. We constantly upgrade our systems to stay aligned with global market standards. The analytics we provide help you make informed choices at the right time. You’ll find our interface clean, responsive, and easy to navigate. Our mission is to remove obstacles and empower you with real trading strength. Join us and enjoy a refined trading experience.

            Comment

            • janeront
              New Member
              • Dec 2025
              • 3

              #7
              I've had similar issues syncing data across platforms, especially with inconsistent timestamp formats. What helped me was building a lightweight middleware layer that standardizes all incoming data before it hits my database. I use axiom crypto bot to feed the market data into that layer—site handles a good variety of sources, and its output is pretty clean, which saves me from doing too much reformatting myself.
              Super! Thanks for information!

              Comment

              Working...