How to Export Raw Mixpanel Data to Your Database

Mixpanel is great, but it’s very helpful to have your raw event data in your database so you can use SQL to extract insights. In this post, I’ll show you how you can accomplish that. We’ll be using Ruby on Rails and PostgreSQL, but the general approach of pulling data from Mixpanel, inserting it into the database, and scheduling a background job on an interval should work with any language and database.

Overview

Here’s what we’ll cover:

There are some considerations that make this not so simple:

Dependencies

Here are some of the technologies we’re using:

The background job

class EventWorker
  include Sidekiq::Worker
  sidekiq_options retry: false, unique: :until_executed, log_duplicate_payload: true
  PT = Time.find_zone!('America/Los_Angeles')

  def perform
    from_date = PT.now.hour < 5 ? PT.yesterday : PT.today
    EventService.ingest_mixpanel_events(from_date: from_date, to_date: PT.today)
  end
end

We specify retry: false because if the job fails, it’s okay, let it try again on the next interval.

The unique and log_duplicate_payload options are provided by sidekiq-unique-jobs. This will create a lock and release it only when the job is finished executing. That way, only one of these jobs can run at a time. If someone tries to enqueue this job and one is still running, it will be logged, and it the duplicate won’t be processed.

The math with from_date is so that when 12:01am strikes, we don’t just ignore yesterday’s events, because our systems could still push an event that happened minutes ago, or Mixpanel could do the same if they have a delay in their processing. We’re effectively creating a 5 hour buffer here. You can adjust this to whatever you like (eg 1 hour, 8 hours, etc).

The service class

# frozen_string_literal: true

class EventService
  def self.ingest_mixpanel_events(from_date:, to_date:)
    ApplicationRecord.transaction do
      # Blow away all the events in this range
      Time.use_zone(PT) do
        query = <<-SQL.squish
          delete from events
          where time between '%s' and '%s'
        SQL
        sanitized_query = ActiveRecord::Base.send(:sanitize_sql_array, [query, from_date.beginning_of_day.to_s(:db), to_date.end_of_day.to_s(:db)])
        ActiveRecord::Base.connection.exec_query(sanitized_query, "EventService#ingest_mixpanel_events")
        log "Deleted events between #{from_date.beginning_of_day} and #{to_date.end_of_day}"
      end

      # Ingest the events in this range
      (from_date..to_date).each do |day|
        events = []
        jsonl_string = fetch_mixpanel_raw_data(from_date: day, to_date: day)
        log "Fetched JSONL events from mixpanel for #{day}"
        Tempfile.open(['mixpanel', '.json']) do |file|
          file.write(jsonl_string.force_encoding('UTF-8'))
          log "Wrote JSONL to #{file.path}"
          file.rewind
          file.each_line do |line|
            events << convert_line_to_event(line)
          end
        end

        # Perform the inserts
        ActiveRecord::Base.logger.silence do
          Event.import(events, batch_size: 500)
        end
        log "Imported events for #{day}"
      end
    end
  end

  def self.convert_line_to_event(line)
    raw_event = Oj.load(line)
    raw_time = Time.zone.at(raw_event.dig('properties', 'time')) # This is a Pacific unix timestamp, but Rails doesn't know that
    pacific_time = Time.find_zone!('America/Los_Angeles').parse(raw_time.iso8601.gsub(/Z$/, '')) # Tell Rails it's a Pacific timestamp
    raw_event['properties']['time'] = pacific_time.utc # Convert the timestamp to UTC
    build_event_from_raw_event(raw_event)
  end

  def self.fetch_mixpanel_raw_data(from_date:, to_date:)
    conn = Faraday.new do |conn|
      conn.basic_auth(Rails.application.secrets.mixpanel_api_secret, '')
      conn.request :url_encoded
      conn.adapter Faraday.default_adapter
      conn.options.timeout = 60 # 60 seconds
    end
    resp = conn.get('https://data.mixpanel.com/api/2.0/export/', {from_date: from_date, to_date: to_date})
    resp.body
  end

  def self.build_event_from_raw_event(raw_event)
    # Choose whatever Mixpanel event properties you want in your table
    Event.new({
      name: raw_event['event'],
      time: raw_event.dig('properties', 'time'),
      distinct_id: raw_event.dig('properties', 'distinct_id')
    })
  end

  private

  def self.log(msg)
    Rails.logger.tagged(self.name) do
      Rails.logger.debug(msg)
    end
  end
end

There’s a lot going on here, so let’s try and break it down.

ingest_mixpanel_events the main method of the class. It’s responsible for deleting events in our database in the from_date..to_date range, asking Mixpanel for the events in this range, and performing the database inserts. Why do we need to delete events in this range first? The reason is because Mixpanel’s raw data export API doesn’t give you unique identifiers for each event, so you can’t do something like “give me all of today’s events, but don’t insert ones that I already have.” You could do this with timestamps, like “give me all of today’s events, but ignore ones up to $last_import_time,” but then there’s a chance you’ll miss some events if you or Mixpanel ever insert events before $last_import_time.

We use a Tempfile to store the response body from Mixpanel on disk rather than in an Array, which keeps the memory usage of this method low. Then we iterate over each line and call convert_line_to_event.

Everything is wrapped in a transaction because without one, if someone ran a query for today’s events and the background job just deleted today’s events, then the query would return nothing. But if they ran that query again in a few seconds when the background job has a chance to complete the inserts, then it would have results. Using a transaction prevents this confusing experience.

convert_line_to_event is responsible for taking line (String), parsing it as JSON, converting the timestamp to UTC, and calling build_event_from_raw_event. The timestamp work is necessary because Mixpanel stores events in your project’s timezone, and it’s best practise to store timestamps in your database as UTC.

fetch_mixpanel_raw_data is responsible for hitting Mixpanel’s raw data export API and returning the response body.

build_event_from_raw_event is responsible for taking raw_event (JSON) and creating an Event instance (assuming you have an Event model and events table).

The scheduler

every 1.minute, 'Run EventWorker' do
  EventWorker.perform_async
end

At PopSQL, we use Zhong to schedule jobs. Simply add a few lines to your clock.rb file to run the background job every minute:

Ways to make this better

How can we reduce the memory footprint of the background job? When I used memory_profiler to measure the memory usage, a lot of it was coming from initializing instances of Event for each row, and doing the time parsing/math.

@andrewkane pointed out I could pass activerecord-import arrays of columns and values, which is its fastest import mechanism, rather than creating Event instances.

We could also hit the Mixpanel API before opening the database transaction.

Conclusion

Now that your database has your Mixpanel events up to the minute, you can use PopSQL to extract all kinds of insights. Who are the newest users? Which users activated or need help activating? Who’s taking high value actions on the site right now that sales should follow up with? Enjoy, and feel free to chime in in the comments below.

Thanks to Nick Elser and Andrew Kane for guidance and suggestions.