Fresh LinkedIn Profile Data

免费增值
通过 FreshData | 已更新 hace 10 días | Data
人气

9.9 / 10

延迟

3,523ms

服务等级

100%

Health Check

100%

返回全部教程 (9)

Sample Python Code for Search Employees Endpoint

# Search for 100 current employees at Apple and save the basic information to a CSV file.


import requests
import csv
import time
from math import ceil

#change the value to your api key
API_KEY = "YOUR_API_KEY"
host = "fresh-linkedin-profile-data.p.rapidapi.com"

#Search requirements
#100 employees are working at Apple
search_url = "https://fresh-linkedin-profile-data.p.rapidapi.com/search-employees"
search_payload = {
	"current_company_ids": [162479],
	"limit": 100
}
search_headers = {
	"content-type": "application/json",
	"X-RapidAPI-Key": API_KEY,
	"X-RapidAPI-Host": host
}

#Check Search requirements
check_url = "https://fresh-linkedin-profile-data.p.rapidapi.com/check-search-status"
check_headers = {
	"X-RapidAPI-Key": API_KEY,
	"X-RapidAPI-Host": host
}

#Get Result requirements
get_result_url = "https://fresh-linkedin-profile-data.p.rapidapi.com/get-search-results"
get_result_headers = {
	"X-RapidAPI-Key": API_KEY,
	"X-RapidAPI-Host": host
}

data_fields = ["linkedin_url","first_name","last_name","full_name","location","headline","about","company","job_title","company_domain"]

#Init a search employees request, check status and get result when done
def full_search_employees():
    #Init search
    search_response = requests.post(search_url, json=search_payload, headers=search_headers)
    if search_response.status_code == 200:
        request_id = search_response.json().get('request_id')
    else:
        print(search_response.json().get('message'))
        quit()

    done = False
    data = []

    #Check
    while not done:
        # check search status every 5 minutes
        time.sleep(300)
        check_response = requests.get(check_url, headers=check_headers, params={'request_id': request_id})

        if check_response.status_code == 200 and check_response.json().get('status') == 'done':
            #Done
            total_scraped = check_response.json().get('total_employees')
            total_result_pages = ceil(total_scraped/100)

            #Get all results
            for page in range(1,total_result_pages+1):
                querystring = {'request_id': request_id, 'page':page}
                get_result_response = requests.get(get_result_url, headers=get_result_headers, params=querystring)
                if get_result_response.status_code == 200:
                    for rs in get_result_response.json().get('data'):
                        dict_info = {}
                        for f in data_fields:
                            dict_info[f] = rs.get(f)
                        data.append(dict_info)
        else:
            #Not done
            continue

        #save
        filename = "result.csv"

        with open(filename, 'w', encoding='utf-8', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=data_fields)
            writer.writeheader()
            writer.writerows(data)

        #finish
        done = True


if __name__ == '__main__':
    full_search_employees()