mitmproxy: proxy any network traffic through your local machine

mitmproxy: proxy any network traffic through your local machine

mitmproxy is your swiss-army knife for interactive HTTP/HTTPS proxy. In fact, it can be used to intercept, inspect, modify and replay web traffic such as HTTP/1, HTTP/2, WebSockets, or any other SSL/TLS-protected protocols.

Moreover, mitproxy has a powerful Python API offers full control over any intercepted request and response.

ref:
https://mitmproxy.org/
https://docs.mitmproxy.org/stable/

Concept

ref:
https://docs.mitmproxy.org/stable/concepts-howmitmproxyworks/

Installation

$ brew install mitmproxy

$ mitmproxy --version
Mitmproxy: 4.0.4
Python:    3.7.0
OpenSSL:   OpenSSL 1.0.2p  14 Aug 2018
Platform:  Darwin-18.0.0-x86_64-i386-64bit

ref:
https://docs.mitmproxy.org/stable/overview-installation/

Configuration

Make your computer become the man of man-in-the-middle attack.

macOS

$ ipconfig getifaddr en0
192.168.10.102

$ mitmproxy -p 8080

Flow List keys:

  • ?: Show help
  • f: Set view filter
  • r: Replay this flow

Flow Details keys:

  • e: Edit this flow

ref:
https://docs.mitmproxy.org/stable/tools-mitmproxy/
https://github.com/mitmproxy/mitmproxy/blob/master/mitmproxy/tools/console/defaultkeys.py

iOS

  • Go to Settings > Wi-Fi > Your Wi-Fi > Configure Proxy
    • Select Manual, enter the following values:
      • Server: 192.168.10.102
      • Port: 8080
      • Authentication: unchecked
  • Open http://mitm.it/ on Safari
    • Install the corresponding certificate for your device
  • Go to Settings > General > About > Certificate Trust Settings
    • Turn on the mitmproxy certificate
  • Open any app you want to watch

ref:
https://docs.mitmproxy.org/stable/concepts-certificates/

Usage

The most exciting feature is you could alter any request and response using a Python script, mitmdump!

ref:
https://docs.mitmproxy.org/stable/tools-mitmdump/
https://github.com/mitmproxy/mitmproxy/tree/master/examples

Redirect Requests To Your Local Development Server

# redirect_to_localhost.py
from mitmproxy import ctx
from mitmproxy import http

REMOTE_HOST = 'api.example.com'
DEV_HOST = '192.168.0.128'
DEV_PORT = 8000

def request(flow: http.HTTPFlow) -> None:
    if flow.request.pretty_host == REMOTE_HOST:
        ctx.log.info('=== request')
        ctx.log.info(str(flow.request.headers))
        ctx.log.info(f'content: {str(flow.request.content)}')

        flow.request.scheme = 'http'
        flow.request.host = DEV_HOST
        flow.request.port = DEV_PORT

def response(flow: http.HTTPFlow) -> None:
    if flow.request.pretty_host == DEV_HOST:
        ctx.log.info('=== response')
        ctx.log.info(str(flow.response.headers))
        if flow.response.headers.get('Content-Type', '').startswith('image/'):
            return
        ctx.log.info(f'body: {str(flow.response.get_content())}')

mitmproxy Scripting API
https://github.com/mitmproxy/mitmproxy/tree/master/examples

Currently, changing the Host server for HTTP/2 connections is not allowed, but you could just disable HTTP/2 proxy to solve the issue if you don't need HTTP/2 for local development.

$ mitmdump -p 8080 \
--ignore-hosts "apple.com|itunes.com" \
--no-http2 \
-s redirect_to_localhost.py

ref:
https://discourse.mitmproxy.org/t/reverse-mode-change-request-host-according-to-the-sni-https/466

Integrate with Mixpanel API in Python

Integrate with Mixpanel API in Python

Mixpanel is one of the leading business analytics services. But expensive.

ref:
https://mixpanel.com/report/1262196/dashboard

Installation

$ pip install mixpanel==4.3.2

ref:
https://pypi.org/project/mixpanel/

Configuration

You probably don't want to send Mixpanel events synchronously and blocking your application, so just put them into a Celery task. Maybe with a shared reqeusts session.

# in app.py
from celery import Celery
from flask import Flask
import boltons.cacheutils
import requests

def make_celery(app):
    celery = Celery(
        'YOUR_APP_NAME',
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL'],
    )

    class ContextTask(celery.Task):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                # pass an extra `task` parameter in each Celery task for accessing Task properties
                return celery.Task.__call__(self, *args, **kwargs)

        @boltons.cacheutils.cachedproperty
        def session(self):
            session = requests.Session()
            session.mount('https://', requests.adapters.HTTPAdapter(
                pool_maxsize=10,
                pool_connections=10,
            ))
            return session

        @boltons.cacheutils.cachedproperty
        def mixpanel(self):
            import mixpanel

            class RequestsConsumer(mixpanel.Consumer):

                def __init__(self, client=None, *args, **kwargs):
                    self.client = client or requests
                    super(RequestsConsumer, self).__init__(*args, **kwargs)

                def _write_request(self, request_url, json_message, api_key=None):
                    data = {
                        'data': mixpanel.base64.b64encode(json_message.encode('utf8')),
                        'verbose': 1,
                        'ip': 0,
                    }
                    if api_key:
                        data.update({'api_key': api_key})

                    try:
                        response = self.client.post(
                            request_url, data=data, timeout=self._request_timeout,
                        )
                        response.raise_for_status()
                    except requests.exceptions.HTTPError as err:
                        raise mixpanel.MixpanelException(err) from err

                    try:
                        data = response.json()
                    except ValueError:
                        raise mixpanel.MixpanelException(f'Cannot interpret Mixpanel server response: {response}')
                    else:
                        if data['status'] != 1:
                            raise mixpanel.MixpanelException(f"Mixpanel error: {data['error']}")
                    return True

            mp = mixpanel.Mixpanel('MIXPANEL_PROJECT_TOKEN', consumer=RequestsConsumer(client=self.session))
            return mp

    celery.Task = ContextTask
    return celery

app = Flask('simple-project')
app.celery = make_celery(app)

ref:
https://github.com/mixpanel/mixpanel-python
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

Usage

ref:
https://mixpanel.com/help/reference/python
https://mixpanel.github.io/mixpanel-python/

Property Data Types

Event and user properties support following data types:

  • String
  • Number
  • Boolean
  • Date
  • List

Mixpanel does not allow dict in properties.

ref:
https://help.mixpanel.com/hc/en-us/articles/115004547063-Properties-Supported-Data-Types

Attach Extra Attributes To User Profiles

# ext_mixpanel/tasks.py
import datetime

import celery
import mixpanel

@celery.shared_task(bind=True)
def people_set(task, *args, **kwargs):
    return task.mixpanel.people_set(*args, **kwargs)

people_set.apply_async(kwargs=dict(
    distinct_id=str(user.id),
    properties={
        '$email': user.email,
        'username': user.username,
        'signed_up_at': mixpanel.serialize(datetime.datetime.utcnow()),
    },
))

ref:
https://mixpanel.com/help/reference/python#creating-profiles
https://mixpanel.github.io/mixpanel-python/#mixpanel.Mixpanel.people_set

Track Events

# ext_mixpanel/tasks.py
import celery

@celery.shared_task(bind=True)
def track(task, *args, **kwargs):
    return task.mixpanel.track(*args, **kwargs)

track.apply_async(kwargs=dict(
    distinct_id=post.author.id,
    event_name='post.sent',
    properties={
        'post.id': post.id,
        'post.title': post.title,
        'author.id': post.author.id,
        'author.username': post.author.username,
        'tags': [str(tag) for tag in post.tags],
    },
))

ref:
https://mixpanel.com/help/reference/python#sending-events
https://mixpanel.github.io/mixpanel-python/#mixpanel.Mixpanel.track

Track Revenues

# ext_mixpanel/tasks.py
import celery

@celery.shared_task(bind=True)
def people_track_charge(task, *args, **kwargs):
    return task.mixpanel.people_track_charge(*args, **kwargs)

people_track_charge.apply_async(kwargs=dict(
    distinct_id=order.customer.id,
    amount=round(order.amount_in_usd, 2),
    properties={
        'payment_method': order.payment_method,
    },
))

ref:
https://mixpanel.com/help/reference/python#tracking-revenue
https://mixpanel.github.io/mixpanel-python/#mixpanel.Mixpanel.people_track_charge

JQL

function main() {
  return Events(
    {
      event_selectors: [{event: 'user.registered', selector: ''}],
      from_date: '2018-09-22',
      to_date: '2018-09-22'
    },
  )
  .reduce(mixpanel.reducer.count());
}

ref:
https://mixpanel.com/help/reference/jql/api-reference

碼天狗週刊 第 135 期 @vinta - Kubernetes, Python, MongoDB

碼天狗週刊 第 135 期 @vinta - Kubernetes, Python, MongoDB

本文同步發表於 CodeTengu Weekly - Issue 135

The incomplete guide to Google Kubernetes Engine

根據前陣子搗鼓 Kubernetes 的心得寫了一篇文章,跟大家分享一下,希望有幫助。內容包含概念介紹、建立 cluster、新增 node pools、部署 ConfigMap、Deployment with LivenessProbe/ReadinessProbe、Horizontal Pod Autoscaler、Pod Disruption Budget、StatefulSet、DaemonSet,到說明 Service 和 Ingress 的關係,以及 Node Affinity 與 Pod Affinity 的應用等。

順帶一提,就算只是架來玩玩,建議大家可以直接在 Google Kubernetes Engine 開一個 preemptible(類似 AWS 的 Spot Instances)的 k8s cluster,價格超便宜,所以就不要再用 minikube 啦。不過現在連 Amazon 也有自己的 managed Kubernetes 了,雖然目前公司是用 GCP,但是還是比較懷念 AWS 啊~

Fluent Python

雖然 Python 也是寫了一陣子了,但是每次讀這本書還是能夠學到不少。真心推薦。

當初學 Python 讀的是另一本 Learning Python,查了一下,哇都出到第五版了。

延伸閱讀:

A deep dive into the PyMongo MongoDB driver

Replica Set 通常是 MongoDB 的標準配置(再來就是 Sharding 了),這個 talk 詳細地說明了 Replica Set 是如何應對 service discovery 以及 PyMongo 和 Replica Set 之間是怎麼溝通的。

延伸閱讀:

Let's talk about usernames

就像我們之前提到過很多次的 Falsehoods 系列,這篇文章也是一直不厭其煩地告訴大家,幾乎每個系統、每個網站都會有的東西:username,其實沒有你以為的那麼簡單。大家感受一下。

作者也提到一個很重要的 The Tripartite Identity Pattern,把所謂的 ID 分成以下三種:

  1. System-level identifier, suitable for use as a target of foreign keys in our database
  2. Login identifier, suitable for use in performing a credential check
  3. Public identity, suitable for displaying to other users

而不要想用同一個 identifier 搞定所有用途。

Web Architecture 101

這篇文章淺顯易懂地解釋了一個現代的 web service 通常會具備的各項元件。不過說真的,如果你今天是一個初入門的後端工程師,你究竟得花多少時間和心力才能摸清楚這篇文章提到的東西?更別提那些更加底層的知識了,喔,這篇文章甚至也還沒提到 DevOps 的事情呢。就像之前讀到的 Will Kubernetes Collapse Under the Weight of Its Complexity?,總覺得整個態勢發展到現在,對新手(甚至是我們這種普通的 1x 工程師)似乎不是很友善啊。

延伸閱讀:

Integrate with Google Cloud API in Python

Integrate with Google Cloud API in Python

Installation

$ pipenv install google-cloud

# you could install specific components you want
$ pipenv install google-cloud-storage

ref:
https://google-cloud-python.readthedocs.io/en/latest/index.html

Google Cloud Storage

It is worth noting that, initializing storage.Client() is a blocking call.

ref:
https://google-cloud-python.readthedocs.io/en/latest/storage/buckets.html
https://cloud.google.com/storage/docs/reference/libraries

Update A File's Metadata

from google.cloud import storage

storage_client = storage.Client()
source_bucket = storage_client.get_bucket('asia.public.swag.live')
source_file = source_bucket.get_blob('launchs/57c16f5bb811055b66d8ef46.jpg')
source_file.metadata = {
    'Link': '<https://api.v2.swag.live/users/57c16f5bb811055b66d8ef46>; rel="user"',
}
source_file.patch()

ref:
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/1185

Copy A File

from google.cloud import storage

def copy_file(source_bucket, source_name, destination_bucket, destination_name):
    storage_client = storage.Client()
    source_bucket = storage_client.get_bucket(source_bucket)
    source_file = source_bucket.blob(source_name)
    destination_bucket = storage_client.get_bucket(destination_bucket)
    destination_file = source_bucket.copy_blob(source_file, destination_bucket, destination_name)
    return destination_file

file_ext_mapping = {
    'image/jpeg': 'jpg',
    'video/mp4': 'mp4',
}
file_ext = file_ext_mapping[original_message.media.mimetype]
source_name = f'messages/{original_message.id}.{file_ext}'
destination_name = f'messages/{new_message.id}.{file_ext}'

copy_file(
    source_bucket='asia.uploads.swag.live',
    source_name=source_name,
    destination_bucket='asia.uploads.swag.live',
    destination_name=destination_name,
)

ref:
https://cloud.google.com/storage/docs/json_api/v1/objects/copy
https://cloud.google.com/storage/docs/renaming-copying-moving-objects#storage-copy-object-python

Copy A Folder With Batch Operations

from google.cloud import storage

def copy_files(source_bucket_name, source_name_prefix, destination_bucket_name, fix_destination_name_func=None):
    storage_client = storage.Client()
    source_bucket = storage_client.get_bucket(source_bucket_name)
    destination_bucket = storage_client.get_bucket(destination_bucket_name)
    blobs = source_bucket.list_blobs(prefix=source_name_prefix)

    # YOU CANNOT DO THIS
    # blobs is a HTTP iterator
    # blobs.num_results always return 0
    # if not blobs.num_results:
    #     raise ValueError(f'No objects matched: gs://{source_bucket.name}/{source_name_prefix}')

    with storage_client.batch():
        for source_blob in blobs:
            destination_name = fix_destination_name_func(source_blob.name) if callable(fix_destination_name_func) else source_blob.name
            source_bucket.copy_blob(source_blob, destination_bucket, destination_name)
    return True

source_bucket_name = 'asia.uploads.swag.live'
destination_bucket_name = 'asia.contents.swag.live'
source_name_prefix = 'forum-posts/123'

copy_files(
    source_bucket_name=source_bucket_name,
    destination_bucket_name=destination_bucket_name,
    source_name_prefix=source_name_prefix,
    fix_destination_name_func=lambda source_name: source_name.replace(source_name_prefix, 'forum-posts'),
)

equals to

$ gsutil cp -r "gs://asia.uploads.swag.live/forum-posts/123/*" "gs://asia.contents.swag.live/"

ref:
https://cloud.google.com/storage/docs/listing-objects

batch() does not guarantee the order of executions, so do not mix different type of calls in the same batch. For instance, the batch should not be a mixture of "copy a.txt" then delete a.txt.

ref:
https://googlecloudplatform.github.io/google-cloud-python/latest/storage/batch.html

Upload A File Directly To A Bucket

We first need to generate a signed upload URL and we could upload the file to the URL.

import base64
import datetime
import time

from oauth2client.client import GoogleCredentials
import yarl

credentials = GoogleCredentials.get_application_default()

def signurl(method, url, content_type=None, expires_at=None, md5sum=None, meta=None):
    method, is_resumable = method.upper(), False
    if method in ['RESUMABLE']:
        method, is_resumable = 'POST', True
    path = yarl.URL(url).path

    def signature():
        def _signature_parts():
            def _meta():
                for key, value in (meta or {}).items():
                    yield 'x-goog-meta-{key}:{value}'.format(key=key, value=value)
                if is_resumable:
                    yield 'x-goog-resumable:start'

            yield method
            yield md5sum or ''
            # we need to use `curl -H 'content-type:'` to upload if we sign an empty content-type
            yield content_type or 'application/octet-stream'
            yield str(int(time.mktime(expires_at.timetuple()))) if expires_at else ''
            yield from sorted(_meta())
            yield path

        _, signature = credentials.sign_blob('\n'.join(_signature_parts()))
        return base64.b64encode(signature).decode('utf-8')

    def params():
        yield 'GoogleAccessId', credentials.service_account_email
        if expires_at:
            yield 'Expires', int(time.mktime(expires_at.timetuple()))
        yield 'Signature', signature()

    return str(yarl.URL(url).with_query(**dict(params())))

signurl(
    method='RESUMABLE',
    url='https://storage.googleapis.com/asia.uploads.swag.live/forum-posts/your-filename.ext'
    expires_at=datetime.datetime.utcnow() + datetime.timedelta(hours=24),
)
$ curl -v -X 'POST' \
-H 'content-type: application/octet-stream' \
-H 'x-goog-resumable:start' \
-d '' 'THE_SIGNED_UPLOAD_URL'

$ curl -v -X PUT \
--upload-file whatever.mp4 \
THE_URL_FROM_LOCATION_HEADER_OF_THE_ABOVE_RESPONSE

ref:
https://cloud.google.com/storage/docs/access-control/signed-urls#signing-resumable
https://cloud.google.com/storage/docs/xml-api/resumable-upload
https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload
https://cloud.google.com/storage/docs/uploading-objects

Setup Celery with your Flask project

Setup Celery with your Flask project

Installation

$ pipenv install flask "celery[redis,gevent]"

ref:
http://docs.celeryproject.org/en/latest/index.html
https://github.com/celery/celery

Configuration

$ tree simple-api
simple-api
├── Dockerfile
├── Pipfile
├── Pipfile.lock
├── app.py
├── requirements.txt
└── simple_api
    ├── bar
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── foo
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── __init__.py
    └── tasks.py
# simple_api/celeryconfig.py
import os

broker_url = os.environ.get('CELERY_BROKER_URL') or 'redis://127.0.0.1:6379/0'
result_backend = os.environ.get('CELERY_RESULT_BACKEND') or 'redis://127.0.0.1:6379/1'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

task_eager_propagates = True
task_ignore_result = True

timezone = 'UTC'
enable_utc = True
# simple_api/__init__.py
from celery import Celery
from flask import Flask

def make_celery(app):
    celery = Celery(app.import_name)
    celery.config_from_object('simple_api.celeryconfig')

    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

def create_app():
    app = Flask(__name__)
    app.config.from_object('simple_api.config')

    from . import tasks

    app.celery = make_celery(app)
    return app
# app.py
import simple_api

app = simple_api.create_app()
celery = app.celery

ref:
http://flask.pocoo.org/docs/1.0/patterns/celery/
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration
http://docs.celeryproject.org/en/latest/userguide/application.html
http://docs.celeryproject.org/en/latest/userguide/configuration.html

Tasks

# simple_api/tasks.py
import celery

@celery.shared_task()
def sleep(message, seconds=1):
    import time
    time.sleep(seconds)
    print(message)
    return seconds

ref:
http://docs.celeryproject.org/en/latest/userguide/tasks.html

# simple_api/whatever.py
from simple_api import tasks

tasks.sleep.delay('Hello World', seconds=5)

ref:
http://docs.celeryproject.org/en/latest/userguide/calling.html

Usage

# run web
$ FLASK_APP=app.py FLASK_ENV=development  flask run

# run workers
$ celery -A app:celery worker -P gevent --without-gossip -c 100 --prefetch-multiplier 1 -Ofair -l info

ref:
http://docs.celeryproject.org/en/latest/userguide/optimizing.html