Contents
Pipedrive API と Salesforce REST API の基礎
この章では、両方の CRM が提供する REST API の全体像と認証方式を整理します。
まず Pipedrive はシンプルなトークン認証で手軽に始められる点が特徴です。一方 Salesforce は OAuth2 による高度な認可機構とバージョニングが必須になるため、実装前にフローを正確に把握しておく必要があります。以下でエンドポイント・認証手順・代表的リクエスト例を詳述します。
Pipedrive API 概要(エンドポイント・認証方式)
Pipeddrive の API は 1.0 系と 2.0 系が混在していますが、現在推奨されるのは v1 系統です。
- ベース URL:https://api.pipedrive.com/v1/(例: /deals、/persons)
- 認証方式:ユーザー設定画面で発行できる API トークン をクエリパラメータ api_token もしくは HTTP ヘッダー Authorization: Bearer <token> に付与します。OAuth2 も利用可能ですが、シンプルさが求められる場面ではトークン方式で十分です。
- レスポンス形式:すべて JSON。成功時は { "success": true, "data": [...] } の形で data 配列にレコードが格納され、エラー時は HTTP ステータスと error フィールドで詳細が返ります。
代表的なリクエスト例
| メソッド | エンドポイント | 説明 |
|---|---|---|
| GET | /deals?api_token=YOUR_TOKEN |
全 Deal の一覧取得(ページングは start パラメータ) |
| POST | /persons?api_token=YOUR_TOKEN |
新規コンタクト作成(JSON ボディに name, email 等) |
| PUT | /deals/123?api_token=YOUR_TOKEN |
Deal ID 123 の情報更新(部分更新可能) |
Salesforce REST API 概要(OAuth2 フロー・主要リソース)
Salesforce の REST API は インスタンスごとのベース URL と API バージョン が組み合わさった形でアクセスします。
- ベース URL 例:https://yourInstance.salesforce.com/services/data/v58.0/(vXX.X 部分はリリースに合わせて変更)
- 認証方式:OAuth2 の「Authorization Code」フローまたは「Refresh Token」フローを使用し、取得したアクセストークンを Authorization: Bearer <access_token> ヘッダーで送ります。トークンには有効期限とスコープが設定されるため、長期運用ではリフレッシュトークンの管理が必須です。
- 主要エンドポイント:/sobjects/ 配下に標準オブジェクト(Opportunity, Contact など)やカスタムオブジェクトが配置されます。
代表的なリクエスト例
| メソッド | エンドポイント | 説明 |
|---|---|---|
| GET | /sobjects/Opportunity/001xx000003DGbYAAW |
指定 Opportunity の詳細取得 |
| POST | /sobjects/Contact/ |
新規 Contact 作成(JSON ボディに必須項目) |
| PATCH | /sobjects/Opportunity/001xx000003DGbYAAW |
部分更新(例: StageName の変更) |
ハイブリッド連携シナリオとデータマッピング
CRM 間のデータ同期は、営業プロセス全体を俯瞰した上でキー項目と変換ロジックを明文化することが成功の鍵です。ここでは実務で頻出する 3 パターンのシナリオを示し、それぞれに対応したマッピング表作成のポイントを解説します。
代表的連携シナリオ例(案件→商談・コンタクト同期・ステージ更新)
以下の表は「Deal → Opportunity」「Contact 同期」「Stage 双方向更新」の 3 パターンに共通する項目マッピングと変換ロジックをまとめたものです。実装前に必ずビジネス要件と照らし合わせてカスタマイズしてください。
| Pipedrive 項目 | Salesforce フィールド | 変換ロジック |
|---|---|---|
deal.title |
Opportunity.Name |
文字列をそのまま転送 |
deal.value |
Amount |
数値 → Decimal(18,2) にキャスト |
deal.user_id |
OwnerId |
ユーザー ID マッピングテーブルで変換(例: Pipedrive の内部ユーザ ↔︎ Salesforce の User.Id) |
person.email |
Contact.Email |
同一メールアドレスで Upsert(存在すれば更新、なければ作成) |
deal.stage_id |
Opportunity.StageName |
ステージコード表(Pipedrive → Salesforce 名称)に基づきマッピング |
シナリオの流れ(簡易フローチャート)
- Deal 作成トリガー(Pipedrive)→ 2. キー項目(deal.id)で外部 ID を検索 → 3. 存在しなければ Opportunity 新規作成、既存なら PATCH 更新 → 4. Contact のメールアドレスで同様の Upsert 実行 → 5. ステージ変更時に相互同期。
データマッピング表の作成ポイント
マッピング表は「キー」「必須」「変換」の 3 カラムをベースにすると、レビューやテストが円滑になります。特に キー項目の一意性 と 必須項目の事前バリデーション はレコード不整合を防ぐ重要ポイントです。
| 項目カテゴリ | Pipedrive フィールド | Salesforce フィールド | 変換ロジック |
|---|---|---|---|
| キー | deal.id |
Opportunity.ExternalId__c |
文字列そのまま保存、重複チェックは外部 ID で実施 |
| 必須 | deal.title |
Name |
空文字の場合は「未設定」へ置換 |
| オプション | person.phone |
Phone |
ハイフン除去・全角→半角変換、国番号が無い場合はデフォルト +81 を付与 |
実装上のヒント
- スプレッドシートや Confluence にマッピング表を保存し、Git でバージョン管理することで変更履歴と影響範囲を追跡できます。
- 変換ロジックは可能な限り 関数化(例: normalize_phone())してコードベースに組み込むとテストが容易です。
ノーコードツールでの実装手順
ノーコードプラットフォームは開発工数を大幅に削減できますが、設定ミスやレートリミットに注意が必要です。ここでは Zapier と Make (Integromat) の具体的な設定フローと留意点を示します。
Zapier 設定フローと注意点
Zapier は「トリガー → アクション」のシンプル構成で素早く PoC が可能です。以下の手順は標準コネクタを使用した場合のベストプラクティスです。
- Trigger(Pipedrive)
- 「New Deal」または「Updated Deal」を選択し、取得した API トークンで認証します。
- Filter(任意)
- 条件式
Stage != "Negotiation"などで不要レコードを除外できます。 - Action(Salesforce)
- 「Create Record」→ オブジェクトは
Opportunityを選択。 - Field Mapping
- Deal の
title→ Opportunity のName、value→Amountなどを UI 上でドラッグ&ドロップします。
注意すべき点
- レートリミット:Zapier は 1 分間に最大 100 リクエスト(無料プラン)しか送れません。大量データはバッチ化して時間帯をずらすか、Make に切り替えることが推奨されます。
- エラーハンドリング:失敗したタスクは「Task History」から手動で再実行する必要があります。重要レコードは Slack 通知やメールアラートと併用し、担当者が即座に対処できる体制を整えてください。
- データ型の自動変換:Zapier は文字列 → 数値への暗黙的変換を行わないケースがあります。数値項目は事前に「Formatter」ステップでキャストしておくとエラーが減ります。
Make (Integromat) の設定ポイントと制限事項
Make はビジュアルシナリオと高度なルーティングが特徴です。以下は複雑変換を含む典型的なフロー例です。
- Trigger(Watch Deals)
- Pipedrive コネクタで「New/Updated Deal」をポーリング取得。
- Router
stage_idに応じて分岐し、Negotiation ステージのみ次工程へ送ります。- OAuth2 Token モジュール(Salesforce)
- 「Refresh Access Token」モジュールでアクセストークンを取得し、変数に保存します。
- HTTP Request(Create Opportunity)
POST /services/data/v58.0/sobjects/Opportunity/に JSON ボディを送信。- Error Handler
- ステータス 429(レートリミット)なら 30 秒待機後再試行、他は Slack 通知でエスカレーションします。
制限事項と回避策
- 無料プランの操作上限:月間 1,000 操作までしか利用できないため、本番環境では必ず有料プランへ移行してください。
- バルクアップサート:Make の標準モジュールは単一レコード処理が前提です。大量データを扱う場合は Salesforce の「Composite」API を自作 HTTP リクエストで呼び出すか、Bulk API 用のカスタムスクリプトと組み合わせる必要があります。
- 実行時間制限:1 シナリオあたり最大 30 分までしか走らないため、長時間バッチは複数シナリオに分割してスケジュールする設計が求められます。
自前スクリプトによる API 連携実装例
コードベースでの実装は柔軟性とテスト容易性を提供します。ここでは Python と Node.js のサンプルを示し、認証・取得・登録の流れとエラーハンドリングのポイントを解説します。
Python サンプルコード解説(認証・データ取得・登録)
この例は requests と simple-salesforce を組み合わせ、環境変数からシークレットを取得しつつ、Deal → Opportunity の同期を行います。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# 1. 必要ライブラリのインストール # pip install requests simple-salesforce boto3 import os, json, time, random, logging import requests from simple_salesforce import Salesforce, SalesforceMalformedRequest import boto3 # ------------------------------------------------- # ロガー設定(JSON 形式で CloudWatch 等に出力可) # ------------------------------------------------- logger = logging.getLogger('crm_sync') handler = logging.StreamHandler() formatter = logging.Formatter( '{"timestamp":"%(asctime)s","level":"%(levelname)s","msg":%(message)s}' ) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) # ------------------------------------------------- # シークレット取得(AWS Secrets Manager を想定) # ------------------------------------------------- def get_secret(name): client = boto3.client('secretsmanager') resp = client.get_secret_value(SecretId=name) return json.loads(resp['SecretString']) secret = get_secret('crm-integration') # JSON {pipedrive_token, sf_client_id, ...} PIPEDRIVE_TOKEN = secret['pipedrive_token'] SF_USERNAME = secret['sf_username'] SF_PASSWORD = secret['sf_password'] # パスワード + security token SF_SECURITY_TOKEN = secret['sf_security_token'] # ------------------------------------------------- # Pipedrive Deal 取得(ページング対応) # ------------------------------------------------- def fetch_deals(start=0, limit=100): url = f'https://api.pipedrive.com/v1/deals?start={start}&limit={limit}&api_token={PIPEDRIVE_TOKEN}' resp = requests.get(url) resp.raise_for_status() data = resp.json() return data['data'] or [], data.get('additional_data', {}).get('pagination', {}) def iter_all_deals(): start = 0 while True: deals, pagination = fetch_deals(start=start) for d in deals: yield d if not pagination.get('more_items_in_collection'): break start = pagination['next_start'] # ------------------------------------------------- # Salesforce 接続(simple-salesforce が自動で OAuth2 を処理) # ------------------------------------------------- def sf_login(): return Salesforce( username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN, client_id='crm-integration', domain='login' # 本番は login、Sandbox は test ) # ------------------------------------------------- # 冪乗バックオフ付きリトライデコレータ # ------------------------------------------------- def retry(func): def wrapper(*args, **kwargs): wait = 1 for attempt in range(5): try: return func(*args, **kwargs) except (requests.HTTPError, SalesforceMalformedRequest) as e: status = getattr(e.response, 'status_code', None) if status and (status >= 500 or status == 429): logger.warning(f'Attempt {attempt+1} failed ({status}); retry in {wait}s') time.sleep(wait + random.uniform(0, 0.5)) wait *= 2 else: logger.error(f'Non‑retriable error: {e}') raise raise RuntimeError('Max retries exceeded') return wrapper # ------------------------------------------------- # Deal → Opportunity 同期ロジック # ------------------------------------------------- @retry def sync_deal(deal, sf): opp = { 'Name': deal['title'], 'Amount': float(deal.get('value') or 0), 'CloseDate': deal.get('expected_close_date') or '2026-12-31', 'StageName': 'Prospecting', 'ExternalId__c': str(deal['id']) } try: sf.Opportunity.create(opp) logger.info(f'Deal {deal["id"]} synced as Opportunity.') except SalesforceMalformedRequest as e: logger.error(f'Failed to sync Deal {deal["id"]}: {e.content}') if __name__ == '__main__': sf = sf_login() for deal in iter_all_deals(): sync_deal(deal, sf) |
ポイント解説
- シークレットは外部ストアに保管し、コード内には決して平文を書かない。
- iter_all_deals は Pipedrive のページングを自動で回すので数千件でも安全に取得可能。
- retry デコレータは 5 回まで指数バックオフで再試行し、永続的な障害は例外として上位へ伝搬します。
Node.js サンプルコード解説(同上)
Node.js 環境では非同期処理がデフォルトになるため、Promise/async‑await を活用した実装例を示します。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
// 1. 必要パッケージのインストール // npm install axios jsforce dotenv @aws-sdk/client-secrets-manager require('dotenv').config(); const { SecretsManagerClient, GetSecretValueCommand } = require('@aws-sdk/client-secrets-manager'); const axios = require('axios'); const { Connection } = require('jsforce'); // ------------------------------------------------- // ロガー(JSON 形式で標準出力へ) // ------------------------------------------------- function log(level, msg) { console.log(JSON.stringify({ timestamp: new Date().toISOString(), level, msg })); } // ------------------------------------------------- // Secrets Manager からシークレット取得 // ------------------------------------------------- async function getSecret(name) { const client = new SecretsManagerClient({}); const command = new GetSecretValueCommand({ SecretId: name }); const response = await client.send(command); return JSON.parse(response.SecretString); } // ------------------------------------------------- (async () => { const secret = await getSecret('crm-integration'); const PIPEDRIVE_TOKEN = secret.pipedrive_token; const SF_USERNAME = secret.sf_username; const SF_PASSWORD = secret.sf_password; // パスワード + security token // ------------------------------------------------- // Pipedrive Deal 取得(ページング対応) // ------------------------------------------------- async function fetchDeals(start = 0, limit = 100) { const url = `https://api.pipedrive.com/v1/deals?start=${start}&limit=${limit}&api_token=${PIPEDRIVE_TOKEN}`; const res = await axios.get(url); return res.data; } async function* allDeals() { let start = 0; while (true) { const { data, additional_data } = await fetchDeals(start); for (const d of data) yield d; if (!additional_data?.pagination?.more_items_in_collection) break; start = additional_data.pagination.next_start; } } // ------------------------------------------------- // Salesforce 接続(jsforce が OAuth2 パスワードフローを内部処理) // ------------------------------------------------- const conn = new Connection({ loginUrl: 'https://login.salesforce.com' }); await conn.login(SF_USERNAME, SF_PASSWORD); log('info', 'Salesforce login succeeded'); // ------------------------------------------------- // 冪乗バックオフ付きリトライ関数 // ------------------------------------------------- async function retry(fn, attempts = 5) { let wait = 1000; for (let i = 0; i < attempts; i++) { try { return await fn(); } catch (e) { const status = e.response?.status; if (status && (status >= 500 || status === 429)) { log('warn', `Attempt ${i+1} failed (${status}); retry after ${wait}ms`); await new Promise(r => setTimeout(r, wait + Math.random() * 500)); wait *= 2; } else { throw e; } } } throw new Error('Max retries exceeded'); } // ------------------------------------------------- // Deal → Opportunity 同期ロジック // ------------------------------------------------- async function syncDeal(deal) { const opp = { Name: deal.title, Amount: Number(deal.value || 0), CloseDate: deal.expected_close_date || '2026-12-31', StageName: 'Prospecting', ExternalId__c: String(deal.id) }; await retry(() => conn.sobject('Opportunity').create(opp)); log('info', `Deal ${deal.id} synced`); } // ------------------------------------------------- // メイン実行ループ // ------------------------------------------------- for await (const deal of allDeals()) { try { await syncDeal(deal); } catch (e) { log('error', `Failed to sync Deal ${deal.id}: ${e.message}`); } } })(); |
ポイント解説
- dotenv と Secrets Manager の併用でローカル開発と本番環境のシークレット管理を統一。
- retry 関数は指数バックオフ+ジッターを実装し、レートリミットや 5xx 系エラーに対して自動再試行します。
- allDeals はジェネレーターで実装したため、メモリ消費が最小限に抑えられ、大規模データでもスムーズに走ります。
認証・トークン管理、エラーハンドリング、運用ベストプラクティス
本章では 認証情報の安全な保管方法 と 障害時の対策 を中心に、長期的に安定した連携基盤を構築するための実装指針をまとめます。
認証フローとアクセストークンの安全な保管方法
| 項目 | 推奨保存先 | アクセス制御例 |
|---|---|---|
| Pipedrive API Token | AWS Secrets Manager(キー: pipedrive/api-token) |
IAM ロール crm-integration に secretsmanager:GetSecretValue のみ付与 |
| Salesforce Refresh Token | 同上(キー: salesforce/refresh-token) |
同上、さらに KMS キーで暗号化しローテーションを有効化 |
| クライアントシークレット等 | HashiCorp Vault も可 | ポリシーで「read only」権限のみ付与 |
実装ヒント
- Python:boto3.client('secretsmanager').get_secret_value(SecretId=...)
- Node.js:new AWS.SecretsManager().getSecretValue({ SecretId })
取得したシークレットは プロセスのメモリ上だけに保持し、ログや例外メッセージに出力しないよう徹底します。定期的なローテーション(90 日ごと)を自動化すれば、漏洩リスクを最小限に抑えられます。
エラーハンドリングと再試行ロジック、ログ出力手法
障害は ステータスコードで分類 し、以下の方針で対処します。
- 5xx 系 / 429(レートリミット) → 指数バックオフ+ジャitter で最大 5 回まで再試行。
- 4xx 系(認証エラー・バリデーションエラー) → 即時通知と手動対応が必要なため、例外として上位へ伝搬。
Python での再試行ユーティリティ(抜粋)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def retry_request(func, max_attempts=5): wait = 1 for attempt in range(1, max_attempts + 1): try: return func() except (requests.HTTPError, SalesforceMalformedRequest) as e: status = getattr(e.response, 'status_code', None) if status and (status >= 500 or status == 429): logger.warning(f'Attempt {attempt} failed ({status}); retry after {wait}s') time.sleep(wait + random.uniform(0, 0.5)) wait *= 2 else: logger.error(f'Non‑retriable error: {e}') raise raise RuntimeError('Max retries exceeded') |
- 構造化ログは JSON 形式で出力し、CloudWatch Logs Insights や Datadog のクエリで容易にフィルタリングできるようにします。
- 通知は失敗レコード数が閾値を超えた場合に SNS → Slack/Teams に流すと、運用担当者の即応が可能です。
監視・メンテナンスポイント(レートリミット、データ整合性チェック)
| 監視項目 | 実装手段 | アラート条件 |
|---|---|---|
| Pipedrive API 呼び出し数 | CloudWatch カスタムメトリクス pipedrive.api.calls(Lambda が呼び出すたびにインクリメント) |
1 分間の呼び出しが 90 件 超えたら SNS 通知 |
| Salesforce API 使用量 | Salesforce の「API Usage」レポートを 5 分ごとに取得し CloudWatch にプッシュ | 24h あたり 95% 超過時に PagerDuty 発報 |
| Deal ↔︎ Opportunity 整合性 | Nightly バッチで ExternalId__c が双方に存在するか照合 |
不一致率が 0.5 % を超えると Slack に警告 |
| トークン有効期限 | Secrets Manager のシークレット更新日時を監視し、残存日数 < 7 日でリフレッシュ通知 | - |
メンテナンスのベストプラクティス
- トークンローテーションは自動化(Lambda → Secrets Manager 更新)し、古いトークンは即座に無効化。
- スキーマ変更(例: カスタム項目追加)は Git 管理されたマッピング表の差分テストを CI パイプラインで走らせることで、本番デプロイ前に破壊的影響を検出できます。
導入支援とチェックリストのご案内
本稿で提示したハイブリッド連携手法は、ノーコードツールで素早く PoC を立ち上げた後、スクリプト化して本番運用に移行する流れが推奨されます。以下のチェックリストを順に実施すれば、抜け漏れなく安全かつ拡張性の高い連携基盤が構築できます。
- 認証情報の安全な保管
- Secrets Manager / Vault に API トークン・Refresh Token を格納。
-
IAM ポリシーで最小権限を付与し、ローテーションスケジュールを設定。
-
データマッピング表の作成とレビュー
- キー項目(Deal ID ↔︎ Opportunity ExternalId)・必須項目・変換ロジックを 3 カラムで整理。
-
ビジネスステークホルダーと合意形成し、Git リポジトリでバージョン管理。
-
ノーコードツールで PoC 実施
- Zapier または Make で「Deal → Opportunity」フローを作成。
-
レートリミット・エラーハンドリングの挙動を検証し、必要に応じてフィルタやバッファ処理を追加。
-
自前スクリプトで本番バッチ構築
- Python または Node.js のサンプルをベースに、再試行・ロギング・監視コードを組み込む。
-
CI/CD パイプラインで単体テスト・統合テスト(Mock API)を走らせる。
-
エラーハンドリング・再試行ロジック実装
- 指数バックオフ+ジャitter を共通ユーティリティ化。
-
失敗レコードは構造化ログに残し、Slack/PagerDuty で自動通知。
-
監視・アラート基盤の設定
- API 使用量・レートリミット・データ整合性を CloudWatch / Stackdriver にメトリクス化。
-
閾値超過時は SNS → Slack/Teams へ通知し、オンコール体制と連携。
-
定期的なレビューと改善
- 月次で API 使用レポートを確認し、プラン変更やバルク処理の導入可否を評価。
- 新規カスタム項目追加時はマッピング表・テストケースを更新し、デプロイ前にリハーサル実施。
このフローに沿って進めれば、Pipedrive と Salesforce の双方向同期が安全かつスケーラブルに実現します。導入段階で不明点や独自要件(例:多言語項目・通貨換算ロジック)が出てきた場合は、専門コンサルタントやシステムインテグレーターへの相談をご検討ください。