Python code to get data from HubSpot using HubSpot API

Python Code to get data from HubSpot using HubSpot API

How to use python code for HubSpot API call – HubSpot Data with Python?

Hi Everyone, hope you are keeping well. I have put together this article to help you guys in building python script to use HubSpot APIs.

HubSpot provides several APIs but in this article, we will be mainly using contact APIs. Don’t worry python logic remains the same for all HubSpot API, only API used for making API calls changes.

So if your requirement is different from HubSpot Contact API, I still recommend going through this article.

Before starting of with the steps to build Python script using HubSpot API. You need to have some credentials, this credential are used to authenticate hubSpot so that we can extract data from HubSpot.

If you are just starting with HubSpot API Configuration and figuring out what credentials you need to make HubSpot API work? and How to generate those credentials? Refer to this HubSpot API Setup Guide.

So lets start.

To give you a high-level overview. We are going to create a JSON file to store all HubSpot credentials, the main python file which will work as initiator and flow controller, and lastly, will create a python module that will have functions for each HubSpot API call.

Python Code for HubSpot API

1. Storing HubSpot Credentials in the JSON file:

First, we are going to create a JSON file that will store all credentials needed to make a successful call to HubSpot API.

I recommend creating a JSON file to store all credentials because it is easy to maintain, update, and track credentials as needed. Here we save this file as “hs_cred.json”

{
    "access_token":"Replace with Access token",
    "app_id":"Replace with app_id",
    "app_secret":"Replace with app secret",
    "count":5,
    "api_key":"f5111aa8-cf8d-4651-8729-c080bf83de23"
}

2. Main file, the center point for all controls:

By this step, we are going to create the main file. Which will work as initiator and whose responsibility will be calling out all APIs which are structured within separate functions.

The below piece of code is the main file in its very beginning, I mean with just one function call. As we proceed to different HubSpot contact API implementations we will add that function call to this file. We save this file as “hs_main.py”.

#!/usr/local/bin/python3
# command to run this code $ python3 ./python/hs_main.py
import getopt
import sys
import datetime
import os.path
import json
 
from get_hs_data import*
 
if __name__ == '__main__':
    try:
        timestamp = datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%d : %H:%M')
        print("DATE : ",timestamp,"\n")
        print("HubSpot data extraction process Started")
        
        #reading json file
        hs_cred_file = "./hs_cred.json" #check out your filepath
        hs_cred_file = open(hs_cred_file, 'r')
        hs_cred_json = json.load(hs_cred_file)
 
        api_key = hs_cred_json["api_key"]
        count = hs_cred_json["count"]
        #call the funtion having HubSpot all contact api
        contact_df = get_all_contact(api_key,count)
        print("contact_df :\n",contact_df)
 
 
        print("HS_MAIN : data extraction Process Finished \n")
    except:
        print("HS_MAIN : data extraction processing Failed !!!!:", sys.exc_info())

Take a halt here and try to understand. what we are doing now and why we need it?

Before moving ahead, create a python module file that will have all the functions for all kinds of HubSpot contact API. Especially the one we are going to cover here. You can save the file as “get_hs_data.py”.

Which will look something like the below code.

#!/usr/bin/python3
import requests
import json
import pandas
import sys
from urllib import parse

To get HubSpot contact there are multiple ways, I mean different API to extract contact data. Like – all contact API, recently updated contact API, get recently created contact API. You can use them according to your implementation/application needs.

3. HubSpot All Contact API:

Get All contact: this API returns all contacts that have been created in the given account. Return is a paginated list of contacts, with a maximum of 100 contacts per page.

The Python function for HubSpot all contact API looks like the code below. The logic extracts only vid and name data for each contact from this API. You can extract more data according to your need.

def get_all_contact(api_key,count):
    try:
        #To get all contact
        url = "https://api.hubapi.com/contacts/v1/lists/all/contacts/all?count="+str(count)+"&hapikey="+api_key
        headers = {}
        contact_list = []
    
        r = requests.get(url = url, headers = headers)
        response_dict = json.loads(r.text)
        
        #to get contact data
        contact_list.extend(response_dict['contacts'])
        
        contact_df = pandas.DataFrame(columns=['vid','first_name','last_name'])
        for cnt in contact_list:
            #You can print cnt to see all available filds you get from this All Contact API
            #print("\n",cnt)
            temp = {}
 
            if 'vid' in cnt.keys():
                vid = cnt['vid']
            
            prop = cnt["properties"]
            if 'firstname' in prop.keys():
                name = prop['firstname']['value']
            if 'lastname' in prop.keys():
                lastname = prop['lastname']['value']
            
            temp.update({"vid":vid})
            temp.update({"first_name":name})
            temp.update({"last_name":lastname})
            #add each contact data to DataFrame
            contact_df = contact_df.append(temp,ignore_index = True)
        
        return contact_df
    except:
        print("\nget_contact Failed",sys.exc_info())
        pass

4. HubSpot Recent Updated Contact API

Get recently updated contact – you can also say this HubSpot API as “Get recently updated and created contacts”. This API returns information about all contacts for a given account that was updated or created in the last 30 days.

The return data is a paginated list, with a maximum of 100 contacts per page. The only thing you need to remember is the endpoint scrolls back in time for only 30 days.

Please note: There are 3 fields here to pay close attention to –

  • “has-more” – which will tell you whether there are more contacts that you can pull.
  • “vid-offset” – let you know where you are in the list of contacts by contact vid.
  • “time-offset – let you know where you are in the list of contacts by time.

The Python function for HubSpot recently updated contact API looks like the code below. The logic extracts only vid and name data for each contact from this API. You can extract more data according to your need.

def get_recent_updated_contact(api_key,count):
    try:
        #To get recent updated contact by all contacts
        url = "https://api.hubapi.com/contacts/v1/lists/recently_updated/contacts/recent?count="+str(count)+"&hapikey="+api_key
        headers = {}
        contact_list = []
        
        r = requests.get(url = url, headers = headers)
        response_dict = json.loads(r.text)
            
        #to get category name of contact data
        contact_list.extend(response_dict['contacts'])
 
        contact_df = pandas.DataFrame(columns=['vid','first_name','last_name'])
        for cnt in contact_list:
            #You can print cnt to see all available filds you get from this All Contact API
            #print("\n",cnt)
            temp = {}
 
            if 'vid' in cnt.keys():
                vid = cnt['vid']
            
            prop = cnt["properties"]
            if 'firstname' in prop.keys():
                name = prop['firstname']['value']
            if 'lastname' in prop.keys():
                lastname = prop['lastname']['value']
            
            temp.update({"vid":vid})
            temp.update({"first_name":name})
            temp.update({"last_name":lastname})
            #add each contact data to DataFrame
            contact_df = contact_df.append(temp,ignore_index = True)
 
        return contact_df
    except:
        print("\n##get_recent_update_contact Failed",sys.exc_info())
        pass

5. HubSpot Recent Created Contact API:

Get recently created contacts – this HubSpot API returns all contacts that have been recently created in the given account. The return is a paginated list, with a maximum of 100 contacts per page.

Please note: There are 3 fields here to pay close attention to –

  • “has-more” – which will tell you whether there are more contacts that you can pull.
  • “vid-offset” – let you know where you are in the list of contacts by contact vid.
  • “time-offset – let you know where you are in the list of contacts by time.

Remember that the response you get from API is sorted in descending order by create date i.e. most recently created contact is returned first.

The Python function for HubSpot recently created contact API looks like the code below. The python logic extracts only vid and name data for each contact from this API. You can extract more data according to your need.

def get_recent_created_contact(api_key,count):
    try:
        #To get recent updated contact by all contacts
        url = "https://api.hubapi.com/contacts/v1/lists/all/contacts/recent?count="+str(count)+"&hapikey="+api_key
        headers = {}
        contact_list = []
            
        r = requests.get(url = url, headers = headers)
        response_dict = json.loads(r.text)
        
        #to get category name of contact data
        contact_list.extend(response_dict['contacts'])
 
        contact_df = pandas.DataFrame(columns=['vid','first_name','last_name'])
        for cnt in contact_list:
            #You can print cnt to see all available filds you get from this All Contact API
            #print("\n",cnt)
            temp = {}
 
            if 'vid' in cnt.keys():
                vid = cnt['vid']
            else:
                vid = None
            
            prop = cnt["properties"]
            if 'firstname' in prop.keys():
                name = prop['firstname']['value']
            else:
                name = None
            if 'lastname' in prop.keys():
                lastname = prop['lastname']['value']
            else:
                lastname = None
            
            temp.update({"vid":vid})
            temp.update({"first_name":name})
            temp.update({"last_name":lastname})
            #add each contact data to DataFrame
            contact_df = contact_df.append(temp,ignore_index = True)
 
        return contact_df
    except:
        print("\n##get_recent_created_contact Failed",sys.exc_info())
        pass

6. HubSpot Contact by VID API:

Get contact by vid – this HubSpot API returns information about single contact by its VID for a given account. What is Contact VID? VID is the contact’s unique ID, which is stored in a field called “vid” that stands for “Visitor ID”.

The Python function for HubSpot contact by vid API looks like the code below. The fields you see down are not only the data we get from this API there are a lot more. The code demonstrates only the elective one. Give a link to the fields here. You can extract more data according to your need.

def contact_insight_extraction(api_key,contact_vid_list):
    try:
        contact_data_df = pandas.DataFrame(columns=["vid","first_conversion_date","lead_score",
                        "recent_conversion_date","lifecycle_stage","form_id","source","form_type",
                        "utm_source","utm_medium","utm_campaign","utm_link"])
        
        for vid in contact_vid_list:
            url = "https://api.hubapi.com/contacts/v1/contact/vid/"+str(vid)+"/profile?hapikey="+str(api_key)
            #initialising variable
            headers = {}
            temp_dict = {}
            try:
                #making api call
                r = requests.get(url = url, headers = headers)
                response_dict = json.loads(r.text)
                #print("\n",response_dict)
                contact_source = response_dict["properties"]["hs_analytics_source"]["value"]
                temp_dict.update({"source":contact_source})
                #print("contact_source :",contact_source)
                if contact_source not in ["OFFLINE","offline"]:
                    temp_dict.update({"vid":vid}) 
                    
                    if 'form-submissions' in response_dict.keys():
                        if 'form_id' in response_dict["form-submissions"][-1].keys():
                            contact_form_id = response_dict["form-submissions"][-1]["form-id"]
                            #print("contact_form_id :",contact_form_id)
                             
                        else:
                            contact_form_id = None
 
                        if 'form_type' in response_dict["form-submissions"][-1].keys():
                            contact_form_type = response_dict["form-submissions"][-1]["form-type"]
                            #print("contact_form_type :",contact_form_type)
                        else:
                            contact_form_type = None
                    else:
                        contact_form_id = None
                        contact_form_type = None
                    temp_dict.update({"form_id":contact_form_id})
                    temp_dict.update({"form_type":contact_form_type})
                    
                    if "first_conversion_date" in response_dict["properties"].keys():
                        cntct_frt_convs_dt = response_dict["properties"]["first_conversion_date"]["value"]
                        your_dt = datetime.datetime.utcfromtimestamp(int(cntct_frt_convs_dt)/1000)
                        dt = datetime.datetime.strftime(your_dt,"%Y-%m-%d")
                        contact_first_conversion_date = datetime.datetime.strptime(dt,"%Y-%m-%d")
                        #print("contact_first_conversion_date :",contact_first_conversion_date)
                    else:
                        contact_first_conversion_date = None
                    temp_dict.update({"first_conversion_date":contact_first_conversion_date})
 
                    if "recent_conversion_date" in response_dict["properties"].keys():
                        cntct_rcnt_convs_dt = response_dict["properties"]["recent_conversion_date"]["value"]
                        your_rcnt_dt = datetime.datetime.utcfromtimestamp(int(cntct_rcnt_convs_dt)/1000)
                        dt = datetime.datetime.strftime(your_rcnt_dt,"%Y-%m-%d")
                        contact_recent_conversion_date = datetime.datetime.strptime(dt,"%Y-%m-%d")
                        #print("contact_recent_conversion_date :",type(contact_recent_conversion_date))
                    else:
                        contact_recent_conversion_date = None
                    temp_dict.update({"recent_conversion_date":contact_recent_conversion_date})
                    
                    if "lifecyclestage" in response_dict["properties"].keys():
                        contact_lifecyclestage = response_dict["properties"]["lifecyclestage"]["value"]
                        #print("contact_lifecyclestage :",contact_lifecyclestage)
                    else:
                        contact_lifecyclestage = None
                        #print("Contact lifecyclestage Missing ",sys.exc_info())
                    temp_dict.update({"lifecycle_stage":contact_lifecyclestage})
 
                    if "hubspotscore" in response_dict["properties"].keys():
                        contact_lead_score = response_dict["properties"]["hubspotscore"]["value"]
                        #print("contact_lead_score :",contact_lead_score)      
                    else:
                        contact_lead_score = 00
                    temp_dict.update({"lead_score":int(contact_lead_score)})
                    try:
                        #initialise a the variable to avid falls positive data
                        url = None
                        utm_source = None
                        utm_medium = None
                        utm_campaign = None
                        if "hs_analytics_first_url" in response_dict["properties"].keys():
                            first_url= response_dict["properties"]["hs_analytics_first_url"]["value"]
                            #print("first_url :",first_url)
                            o = parse.urlparse(first_url)
                            query_url = parse.parse_qs(o.query)
                            url = o._replace(query=None).geturl()
 
                            if 'utm_campaign' in query_url.keys():
                                utm_campaign = query_url['utm_campaign'][0]
                            if 'utm_source' in query_url.keys():
                                utm_source = query_url['utm_source'][0]
                            if 'utm_medium' in query_url.keys():
                                utm_medium = query_url['utm_medium'][0]
                            
                        if "hs_analytics_first_referrer" in response_dict["properties"].keys():
                            #logic to catch contacts coming from capterra 
                            contact_source = response_dict["properties"]["hs_analytics_first_referrer"]["value"]
                            if "capterra" in contact_source:
                                utm_source = "capterra"
 
                        temp_dict.update({"utm_link":url})
                        temp_dict.update({"utm_campaign":utm_campaign})
                        temp_dict.update({"utm_source":utm_source})
                        temp_dict.update({"utm_medium":utm_medium})
                    except:
                        pass
 
                    #add all data to dataframe
                    contact_data_df = contact_data_df.append(temp_dict,ignore_index = True)
            except:
                print("Failed to get the Source!!! for contact vid/ID :",vid)
                pass
        
        return contact_data_df
    except:
        print("\n###get_contact_by_vid Failed : ",sys.exc_info())
        pass
 

Take a halt again and try to understand what’s next. Actually, to call the above function there are two ways. You will see example for both below, we recommend option 2 for faster and automated processing.

1. By Manually Defining a list of Contact VID:

 vid_list = ['4793951','4808951','4136301']
        contact_insight_df = contact_insight_extraction(api_key,vid_list)
        print("contact_insight_df :\n",contact_insight_df)

2. Passing contact vid you got from any one of previous contact API (i.e all contact, recently updated, or recent created):

recent_updated_contact_df = get_recent_updated_contact(api_key,count)
        print("recent_updated_contact_df :\n",recent_updated_contact_df)
 
        contact_insight_df = contact_insight_extraction(api_key,recent_updated_contact_df['vid'])
        print("contact_insight_df :\n",contact_insight_df)

There is a lot more contact API provided by HubSpot for different types of data extraction related to contacts. HubSpot API

After going through all the above functions and calling it from the main file, it will look something like the below code.

#!/usr/local/bin/python3
# command to run this code $ python3 ./python/hs_main.py
import getopt
import sys
import datetime
import os.path
import json
 
from get_hs_data import*
 
if __name__ == '__main__':
    try:
        timestamp = datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%d : %H:%M')
        print("DATE : ",timestamp,"\n")
        print("HubSpot data extraction process Started")
        
        #reading client_id json file
        hs_cred_file = "./source/hs_cred.json"
        hs_cred_file = open(hs_cred_file, 'r')
        hs_cred_json = json.load(hs_cred_file)
 
        api_key = hs_cred_json["api_key"]
        count = hs_cred_json["count"]
        #call the function having HubSpot all contact api
        all_contact_df = get_all_contact(api_key,count)
        print("all_contact_df :\n",all_contact_df)
 
        recent_updated_contact_df = get_recent_updated_contact(api_key,count)
        print("recent_updated_contact_df :\n",recent_updated_contact_df)
 
        recent_created_contact_df = get_recent_created_contact(api_key,count)
        print("recent_created_contact_df :\n",recent_created_contact_df)
 
        contact_insight_df = contact_insight_extraction(api_key,recent_updated_contact_df['vid'])
        print("contact_insight_df :\n",contact_insight_df)
 
        print("HS_MAIN : data extraction Process Finished \n")
    except:
        print("HS_MAIN : data extraction processing Failed !!!!:", sys.exc_info())
 
 

Calling HubSpot API with Access Token:

To give you an idea of how to make API calls with access_token. Below is the python function for the recent_update_contact API. The same one that you have seen above only difference is here we are using access_token rather than api_key.

def get_recent_updated_contact(access_token,count):
    try:
        #To get recent updated contact by all contacts
        url = "https://api.hubapi.com/contacts/v1/lists/recently_updated/contacts/recent?"
        parameter_dict = {'count': count}
        headers = {"Authorization": "Bearer "+access_token}
        contact_list = []
        parameters = urllib.parse.urlencode(parameter_dict)
        get_url = url + parameters
        r = requests.get(url = get_url, headers = headers)
        response_dict = json.loads(r.text)
            
        #to get category name of contact data
        contact_list.extend(response_dict['contacts'])
 
        contact_df = pandas.DataFrame(columns=['vid','first_name','last_name'])
        for cnt in contact_list:
            #You can print cnt to see all available filds you get from this All Contact API
            #print("\n",cnt)
            temp = {}
 
            if 'vid' in cnt.keys():
                vid = cnt['vid']
            else:
                vid = None
            prop = cnt["properties"]
            if 'firstname' in prop.keys():
                first_name = prop['firstname']['value']
            else:
                first_name = None
            if 'lastname' in prop.keys():
                last_name = prop['lastname']['value']
            else:
                last_name = None
            
            temp.update({"vid":vid})
            temp.update({"first_name":first_name})
            temp.update({"last_name":last_name})
            #add each contact data to DataFrame
            contact_df = contact_df.append(temp,ignore_index = True)
 
        return contact_df
    except:
        print("\n##get_recent_update_contact Failed",sys.exc_info())
        pass

Calling HubSpot API with has_more flag check and end_date constraint.

This is how the above function for API calls will look. Note that in the below example I have used the access_token technique to make HubSpot calls. You can use the api_key call as well.

def get_recent_updated_contact(access_token,count,end_date):
    try:
        #To get recent updated contact by all contacts
        url = "https://api.hubapi.com/contacts/v1/lists/recently_updated/contacts/recent?"
        parameter_dict = {'count': count}
        headers = {"Authorization": "Bearer "+access_token}
        contact_list = []
        
        has_more = True
        while has_more:
            parameters = urllib.parse.urlencode(parameter_dict)
            get_url = url + parameters
            r = requests.get(url = get_url, headers = headers)
            response_dict = json.loads(r.text)
            has_more = response_dict['has-more']
            #to get category name of contact data
            contact_list.extend(response_dict['contacts'])
 
            contact_vid_df = pandas.DataFrame(columns=['vid','first_name','last_name'])
        
            for cnt in contact_list:
                temp = {}
                if 'vid' in cnt.keys():
                    vid = cnt['vid']
                else:
                    vid = None
                prop = cnt["properties"]
                if 'firstname' in prop.keys():
                    first_name = prop['firstname']['value']
                else:
                    first_name = None
                if 'lastname' in prop.keys():
                    last_name = prop['lastname']['value']
                else:
                    last_name = None
                temp.update({"vid":vid})
                temp.update({"first_name":first_name})
                temp.update({"last_name":last_name})
                
                contact_vid_df = contact_vid_df.append(temp,ignore_index = True)
            
            parameter_dict['vidOffset']= response_dict['vid-offset']
            time_offset = response_dict['time-offset']
            parameter_dict['timeOffset'] = time_offset
            
            if time_offset < end_date:
                print('End date reached')
                break
        
        return contact_vid_df
    except:
        print("\n##get_recent_update_contact Failed",sys.exc_info())
        pass
 

For any suggestions or doubts ~ Get In Touch

Leave a Reply

Your email address will not be published. Required fields are marked *