mitmproxy: proxy any network traffic through your local machine

mitmproxy: proxy any network traffic through your local machine

mitmproxy is your swiss-army knife for interactive HTTP/HTTPS proxy. In fact, it can be used to intercept, inspect, modify and replay web traffic such as HTTP/1, HTTP/2, WebSockets, or any other SSL/TLS-protected protocols.

Moreover, mitproxy has a powerful Python API offers full control over any intercepted request and response.

ref:
https://mitmproxy.org/
https://docs.mitmproxy.org/stable/

Concept

ref:
https://docs.mitmproxy.org/stable/concepts-howmitmproxyworks/

Installation

$ brew install mitmproxy

$ mitmproxy --version
Mitmproxy: 4.0.4
Python:    3.7.0
OpenSSL:   OpenSSL 1.0.2p  14 Aug 2018
Platform:  Darwin-18.0.0-x86_64-i386-64bit

ref:
https://docs.mitmproxy.org/stable/overview-installation/

Configuration

Make your computer become the man of man-in-the-middle attack.

macOS

$ ipconfig getifaddr en0
192.168.0.128

$ mitmproxy -p 8888
# or
$ mitmweb -p 8888
$ open http://127.0.0.1:8081/

Flow List keys:

  • ?: Show help
  • q: Exit the current view
  • f: Set view filter
  • r: Replay this flow
  • i: Set intercept filter
  • hjkl or arrow: Move left/down/up/right
  • enter: Select

Flow Details keys:

  • tab: Select next
  • m: Set flow view mode
  • e: Edit this flow (request or response)
  • a: Accept this intercepted flow

ref:
https://docs.mitmproxy.org/stable/tools-mitmproxy/
https://github.com/mitmproxy/mitmproxy/blob/master/mitmproxy/tools/console/defaultkeys.py

iOS

  • Go to Settings > Wi-Fi > Your Wi-Fi > Configure Proxy
    • Select Manual, enter the following values:
      • Server: 192.168.0.128
      • Port: 8888
      • Authentication: unchecked
  • Open http://mitm.it/ on Safari
    • Install the corresponding certificate for your device
  • Go to Settings > General > About > Certificate Trust Settings
    • Turn on the mitmproxy certificate
  • Open any app you want to watch

ref:
https://docs.mitmproxy.org/stable/concepts-certificates/

Usage

The most exciting feature is you could alter any request and response using a Python script, mitmdump -s!

ref:
https://docs.mitmproxy.org/stable/tools-mitmdump/
https://github.com/mitmproxy/mitmproxy/tree/master/examples

Deal With Certificate Pinning

You can use your own certificate by passing the --certs example.com=/path/to/example.com.pem option to mitmproxy. Mitmproxy then uses the provided certificate for interception of the specified domain.

The certificate file is expected to be in the PEM format which would roughly looks like this:

-----BEGIN PRIVATE KEY-----
<private key>
-----END PRIVATE KEY-----

-----BEGIN CERTIFICATE-----
<cert>
-----END CERTIFICATE-----

-----BEGIN CERTIFICATE-----
<intermediary cert (optional)>
-----END CERTIFICATE-----
$ mitmproxy -p 8888 --certs example.com=example.com.pem

ref:
https://docs.mitmproxy.org/stable/concepts-certificates/#using-a-custom-server-certificate

Redirect Requests To Your Local Development Server

# redirect_to_localhost.py
from mitmproxy import ctx
from mitmproxy import http

REMOTE_HOST = 'api.example.com'
DEV_HOST = '192.168.0.128'
DEV_PORT = 8000

def request(flow: http.HTTPFlow) -> None:
    if flow.request.pretty_host in [REMOTE_HOST, DEV_HOST]:
        ctx.log.info('=== request')
        ctx.log.info(str(flow.request.headers))
        ctx.log.info(f'content: {str(flow.request.content)}')

        flow.request.scheme = 'http'
        flow.request.host = DEV_HOST
        flow.request.port = DEV_PORT

def response(flow: http.HTTPFlow) -> None:
    if flow.request.pretty_host == DEV_HOST:
        ctx.log.info('=== response')
        ctx.log.info(str(flow.response.headers))
        if flow.response.headers.get('Content-Type', '').startswith('image/'):
            return
        ctx.log.info(f'body: {str(flow.response.get_content())}')

ref:
https://discourse.mitmproxy.org/t/reverse-mode-change-request-host-according-to-the-sni-https/466

You could use negative regex with --ignore-hosts to only watch specific domains. Of course, you are still able to blacklist any domain you don't want: --ignore-hosts 'apple.com|icloud.com|itunes.com|facebook.com|googleapis.com|crashlytics.com'.

Currently, changing the Host server for HTTP/2 connections is not allowed, but you could just disable HTTP/2 proxy to solve the issue if you don't need HTTP/2 for local development.

$ mitmdump -p 8888 \
--certs example.com=example.com.pem \
-v --flow-detail 3 \
--ignore-hosts '^(?!.*example\.com)' \
--no-http2 \
-s redirect_to_localhost.py

ref:
https://stackoverflow.com/questions/29414158/regex-negative-lookahead-with-wildcard

Integrate with Google Cloud API in Python

Integrate with Google Cloud API in Python

google-cloud, Python idiomatic clients for Google Cloud Platform services. There is an older Python library also officially supported by Google, google-api-python-client, which is in maintenance mode.

ref:
https://github.com/googleapis/google-cloud-python
https://github.com/googleapis/google-api-python-client

Installation

$ pip install google-cloud

# you could also only install specific components
$ pip install google-cloud-storage

ref:
https://pypi.org/search/?q=google+cloud

Google Cloud Storage

It is worth noting that, initializing storage.Client() is a blocking call.

ref:
https://googleapis.github.io/google-cloud-python/latest/storage/buckets.html
https://cloud.google.com/storage/docs/reference/libraries

Upload From String

from google.cloud.storage.bucket import Bucket
from google.cloud.storage.blob import Blob

def upload_from_string(bucket_id, content, filename, content_type):
    client = storage.Client()
    bucket = Bucket(client, bucket_id)
    blob = Blob(filename, bucket)
    blob.upload_from_string(content, content_type)

Upload From An URL

from google.cloud import storage
import requests

def upload_from_url(bucket_id, filename, url):
    client = storage.Client()
    session = requests.Session()
    with session.get(url, stream=True) as response:
        bucket = client.get_bucket(bucket_id)
        blob = bucket.blob(filename)
        blob.upload_from_file(response.raw, content_type=response.headers.get('Content-Type'))

Update A File's Metadata

from google.cloud import storage

def update_metadata(bucket, filepath, new_metadata):
    bucket = task.storage_client.get_bucket(bucket)
    blob = bucket.get_blob(filepath)
    blob.metadata = {**blob.metadata, **new_metadata} if blob.metadata else new_metadata
    blob.patch()

new_metadata = {
    'Link': '<https://api.example.com/users/57c16f5bb811055b66d8ef46>; rel="user"',
}

ref:
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/1185

Copy A File

from google.cloud import storage

def copy_file(source_bucket, source_name, destination_bucket, destination_name):
    storage_client = storage.Client()
    source_bucket = storage_client.get_bucket(source_bucket)
    source_file = source_bucket.blob(source_name)
    destination_bucket = storage_client.get_bucket(destination_bucket)
    destination_file = source_bucket.copy_blob(source_file, destination_bucket, destination_name)
    return destination_file

file_ext_mapping = {
    'image/jpeg': 'jpg',
    'video/mp4': 'mp4',
}
file_ext = file_ext_mapping[original_message.media.mimetype]
source_name = f'messages/{original_message.id}.{file_ext}'
destination_name = f'messages/{new_message.id}.{file_ext}'

copy_file(
    source_bucket='asia.uploads.example.com',
    source_name=source_name,
    destination_bucket='asia.uploads.example.com',
    destination_name=destination_name,
)

ref:
https://cloud.google.com/storage/docs/json_api/v1/objects/copy
https://cloud.google.com/storage/docs/renaming-copying-moving-objects#storage-copy-object-python

Copy A Folder With Batch Operations

from google.cloud import storage

def copy_files(source_bucket_name, source_name_prefix, destination_bucket_name, fix_destination_name_func=None):
    storage_client = storage.Client()
    source_bucket = storage_client.get_bucket(source_bucket_name)
    destination_bucket = storage_client.get_bucket(destination_bucket_name)
    blobs = source_bucket.list_blobs(prefix=source_name_prefix)

    # YOU CANNOT DO THIS
    # blobs is a HTTP iterator
    # blobs.num_results always return 0
    # if not blobs.num_results:
    #     raise ValueError(f'No objects matched: gs://{source_bucket.name}/{source_name_prefix}')

    with storage_client.batch():
        for source_blob in blobs:
            destination_name = fix_destination_name_func(source_blob.name) if callable(fix_destination_name_func) else source_blob.name
            source_bucket.copy_blob(source_blob, destination_bucket, destination_name)
    return True

source_bucket_name = 'asia.uploads.example.com'
destination_bucket_name = 'asia.contents.example.com'
source_name_prefix = 'media/123'

copy_files(
    source_bucket_name=source_bucket_name,
    destination_bucket_name=destination_bucket_name,
    source_name_prefix=source_name_prefix,
    fix_destination_name_func=lambda source_name: source_name.replace(source_name_prefix, 'forum-posts'),
)

equals to

$ gsutil cp -r "gs://asia.uploads.example.com/media/123/*" "gs://asia.contents.example.com/"

ref:
https://cloud.google.com/storage/docs/listing-objects

batch() does not guarantee the order of executions, so do not mix different type of calls in the same batch. For instance, the batch should not be a mixture of "copy a.txt" then delete a.txt.

ref:
https://googlecloudplatform.github.io/google-cloud-python/latest/storage/batch.html

Upload A File Directly To A Bucket

We first need to generate a signed upload URL and we could upload the file to the URL.

import base64
import datetime
import time

from oauth2client.client import GoogleCredentials
import yarl

credentials = GoogleCredentials.get_application_default()

def signurl(method, url, content_type=None, expires_at=None, md5sum=None, meta=None):
    method, is_resumable = method.upper(), False
    if method in ['RESUMABLE']:
        method, is_resumable = 'POST', True
    path = yarl.URL(url).path

    def signature():
        def _signature_parts():
            def _meta():
                for key, value in (meta or {}).items():
                    yield 'x-goog-meta-{key}:{value}'.format(key=key, value=value)
                if is_resumable:
                    yield 'x-goog-resumable:start'

            yield method
            yield md5sum or ''
            # we need to use `curl -H 'content-type:'` to upload if we sign an empty content-type
            yield content_type or 'application/octet-stream'
            yield str(int(time.mktime(expires_at.timetuple()))) if expires_at else ''
            yield from sorted(_meta())
            yield path

        _, signature = credentials.sign_blob('\n'.join(_signature_parts()))
        return base64.b64encode(signature).decode('utf-8')

    def params():
        yield 'GoogleAccessId', credentials.service_account_email
        if expires_at:
            yield 'Expires', int(time.mktime(expires_at.timetuple()))
        yield 'Signature', signature()

    return str(yarl.URL(url).with_query(**dict(params())))

signurl(
    method='RESUMABLE',
    url='https://storage.googleapis.com/asia.uploads.example.com/media/your-filename.ext'
    expires_at=datetime.datetime.utcnow() + datetime.timedelta(hours=24),
)
$ curl -v -X 'POST' \
-H 'content-type: application/octet-stream' \
-H 'x-goog-resumable:start' \
-d '' 'THE_SIGNED_UPLOAD_URL'

$ curl -v -X PUT \
--upload-file whatever.mp4 \
THE_URL_FROM_LOCATION_HEADER_OF_THE_ABOVE_RESPONSE

ref:
https://cloud.google.com/storage/docs/access-control/signed-urls#signing-resumable
https://cloud.google.com/storage/docs/uploading-objects
https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload
https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
https://cloud.google.com/storage/docs/xml-api/resumable-upload

Enable CORS For A Google Cloud Storage Bucket

$ gsutil cors get gs://your_bucket_name

$ cat cors.json
[
  {
    "origin": ["*"],
    "responseHeader": ["Content-Type", "x-goog-resumable:start"],
    "method": ["GET", "PUT", ""]
  }
]
$ gsutil cors set cors.json gs://your_bucket_name

ref:
https://cloud.google.com/storage/docs/gsutil/commands/cors
https://medium.com/imersotechblog/upload-files-to-google-cloud-storage-gcs-from-the-browser-159810bb11e3
http://andrewvos.com/uploading-files-directly-to-google-cloud-storage-from-client-side-javascript

Apex and Terraform: The easiest way to manage AWS Lambda functions

Apex and Terraform: The easiest way to manage AWS Lambda functions

AWS Lambda lets you run code without provisioning or managing servers, which is so-called Serverless or Function as a Service (FaaS).

Apex is a Go command-line tool to manage and deploy your serverless functions on AWS Lambda. Apex is also integrated with Terraform to provide cloud infrastructure management, for instance, configuring your AWS Lambda functions with Amazon API Gateway.

ref:
https://aws.amazon.com/lambda/
https://aws.amazon.com/api-gateway/
https://github.com/apex/apex

You could browse projects created in this post on GitHub:
https://github.com/vinta/pangu.space
https://github.com/CodeTengu/LambdaBaku

Install

$ curl https://raw.githubusercontent.com/apex/apex/master/install.sh | sh

ref:
https://apex.run/#installation

Initialize

It is recommended to configure your AWS credentials with awscli.

$ pip install awscli
$ aws configure

ref:
https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html

To use Apex to manage Lambda functions, you have to make sure your AWS credential has minimum IAM permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "iam:CreateRole",
        "iam:CreatePolicy",
        "iam:AttachRolePolicy",
        "iam:PassRole",
        "lambda:GetFunction",
        "lambda:ListFunctions",
        "lambda:CreateFunction",
        "lambda:DeleteFunction",
        "lambda:InvokeFunction",
        "lambda:GetFunctionConfiguration",
        "lambda:UpdateFunctionConfiguration",
        "lambda:UpdateFunctionCode",
        "lambda:CreateAlias",
        "lambda:UpdateAlias",
        "lambda:GetAlias",
        "lambda:ListAliases",
        "lambda:ListVersionsByFunction",
        "logs:FilterLogEvents",
        "cloudwatch:GetMetricStatistics"
      ],
      "Effect": "Allow",
      "Resource": "*"
    }
  ]
}
$ apex init

ref:
https://apex.run/#getting-started

After running apex init, Apex creates a Role and a Policy. You should be able to find them on AWS IAM Management Console. If you want to access other AWS resources, for instance, S3 buckets, DynamoDB tables, SNS, in your Lambda functions, you must create a new Policy which grants appropriate permissions and attachs itself to the Role that Apex created.

Here is a Policy example of operating certain DynamoDB tables:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt123456789",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_Preference",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_Preference/*",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_WeeklyIssue",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_WeeklyIssue/*",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_WeeklyPost",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_WeeklyPost/*"
            ]
        }
    ]
}

Write Lambda Functions

ref:
https://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html
https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html

Node.js

The simplest handler:

const aws = require('aws-sdk');

exports.handle = (event, context, callback) => {
  doYourShit();
  callback(null, 'DONE');
};

ref:
https://docs.aws.amazon.com/lambda/latest/dg/programming-model.html

Call another Lambda function in a Lambda function:

You must make sure your Lambda role has the permission of invoking other Lambda functions.

const util = require('util');

const aws = require('aws-sdk');

const params = {
  FunctionName: 'LambdaBaku_syncIssue',
  InvocationType: 'Event', // means asynchronous execution
  Payload: JSON.stringify({ issue_number: curatedIssue.number }),
};

lambda.invoke(params, (err, data) => {
  if (err) {
    console.log('FAIL', params);
    console.log(util.inspect(err));
  } else {
    console.log(data);
  }
});

ref:
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html
https://stackoverflow.com/questions/31714788/can-an-aws-lambda-function-call-another

Go

Write a Lambda function triggered by Amazon API Gateway:

package main

import (
    "encoding/json"
    "errors"
    "log"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/vinta/pangu"
)

var (
    // ErrTextNotProvided is thrown when text is not provided in HTTP query string
    ErrTextNotProvided = errors.New("No text was provided in HTTP query string")
)

// Handler is the AWS Lambda function handler
func Handler(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
    log.Printf("request id: %s\n", request.RequestContext.RequestID)

    text, ok := request.QueryStringParameters["t"]
    if !ok {
        errMap := map[string]string{
            "message": ErrTextNotProvided.Error(),
        }
        errMapJSON, _ := json.MarshalIndent(errMap, "", " ")

        return events.APIGatewayProxyResponse{
            Body: string(errMapJSON),
            StatusCode: 400,
        }, nil
    }

    log.Printf("text: %s\n", text)

    textPlainHeaders := map[string]string{
        "content-type": "text/plain; charset=utf-8",
    }

    return events.APIGatewayProxyResponse{
        Body: pangu.SpacingText(text),
        Headers: textPlainHeaders,
        StatusCode: 200,
    }, nil
}

func main() {
    lambda.Start(Handler)
}

ref:
https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/
https://docs.aws.amazon.com/lambda/latest/dg/go-programming-model-handler-types.html
https://docs.aws.amazon.com/lambda/latest/dg/go-programming-model-errors.html

Your "Integration Request" configurations in API Gateway should be like:

  • Integration type: Lambda Function
  • Use Lambda Proxy integration: Yes
  • Lambda Region: ap-northeast-1
  • Lambda Function: panguspace_spacing_text
  • Invoke with caller credentials: No
  • Credentials cache: Do not add caller credentials to cache key
  • Use Default Timeout: Yes

It's also worth noting that the API response is mainly defined by APIGatewayProxyResponse in Lambda function code. Configurations in API Gateway, i.e., "Integration Response" and "Method Response" do not matter.

ref:
https://docs.aws.amazon.com/apigateway/latest/developerguide/getting-started-with-lambda-integration.html

Usage

Deploy all functions:

$ apex deploy

ref:
https://apex.run/#deploying-functions

Invoke a function:

# invoke a function directly
$ apex invoke spacing_text --logs
{
    "statusCode": 400,
    "headers": null,
    "body":"{\"message\": \"No text was provided in the HTTP query string\"}"
}

# invoke a function with an API Gateway event
$ cat fixtures/spacing_text_event.json
{
    "queryStringParameters": {"t": "與PM戰鬥的人,應當小心自己不要成為PM"}
}
$ apex invoke spacing_text --logs < fixtures/spacing_text_event.json
{
    "statusCode": 200,
    "headers": {"content-type": "text/plain; charset=utf-8"},
    "body": "與 PM 戰鬥的人,應當小心自己不要成為 PM"
}

ref:
https://apex.run/#invoking-functions

View logs which might delay several seconds:

$ apex logs -f

Pack a function:

$ apex build spacing_text > spacing_text.zip

Configure API Gateway

Create API Keys

To setup API keys, do the following:

  1. Configure your API methods to require an API key
  2. Deploy your API
  3. Create an API key for the API in a region
  4. Create an Usage Plan and assign an API key with a certain Stage

In step 1, your "Method Request" configurations in API Gateway should be like:

  • Authorization: NONE
  • Request Validator: NONE
  • API Key Required: true

Now you are able to call the API with a x-api-key header:

$ curl -H "x-api-key: YOUR-API-KEY" https://xxx.execute-api.ap-northeast-1.amazonaws.com/v1/your-endpoint/

ref:
https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-create-usage-plans-with-rest-api.html
https://docs.aws.amazon.com/apigateway/latest/developerguide/how-to-use-postman-to-call-api.html

Actually, you could release your APIs without API keys if you like.

Setup a Custom Domain

To setup a custom domain which managed by Cloudflare, see the following link:
https://stackoverflow.com/a/46061708/885524

It is worth noting that even the Stack Overflow answer said using Full (Strict) SSL mode but actually Full also works.

Moreover, it might take a long time to generate "Target Domain Name" (xxx.cloudfront.net).

Don't forget to add "Base Path Mappings" in API Gateway Custom Domain Names:

  • api.pangu.space
    • Target Domain Name: xxx.cloudfront.net
    • ACM Certificate: *.pangu.space
    • Base Path Mappings:
      • Path: /v1
      • Destination: Pangu:v1

Manage Infrastructures with Terraform

Terraform is a tool to manage your cloud infrastructures as code.

$ brew install terraform

$ tree .
.
├── functions
│   ├── introduce
│   │   └── main.go
│   └── spacing_text
│       └── main.go
└── infrastructure
    ├── main.tf
    └── variables.tf

Define variables and data sources:

# infrastructure/variables.tf
data "aws_caller_identity" "current" {}

variable "aws_region" {}
variable "apex_environment" {}
variable "apex_function_role" {}

variable "apex_function_arns" {
  type = "map"
}

variable "apex_function_names" {
  type = "map"
}

variable "apex_function_introduce" {}
variable "apex_function_spacing_text" {}

ref:
https://www.terraform.io/docs/providers/aws/d/caller_identity.html

Define AWS resources:

# infrastructure/main.tf
resource "aws_api_gateway_rest_api" "pangu" {
  name = "Pangu"
}

resource "aws_api_gateway_method" "pangu_root" {
  rest_api_id   = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id   = "${aws_api_gateway_rest_api.pangu.root_resource_id}"
  http_method   = "GET"
  authorization = "NONE"
}

resource "aws_api_gateway_integration" "pangu_root_get" {
  rest_api_id             = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id             = "${aws_api_gateway_rest_api.pangu.root_resource_id}"
  http_method             = "${aws_api_gateway_method.pangu_root.http_method}"
  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = "arn:aws:apigateway:${var.aws_region}:lambda:path/2015-03-31/functions/${var.apex_function_introduce}/invocations"
}

resource "aws_api_gateway_method_response" "pangu_root_get_200" {
  rest_api_id = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id = "${aws_api_gateway_rest_api.pangu.root_resource_id}"
  http_method = "${aws_api_gateway_method.pangu_root.http_method}"
  status_code = "200"

  response_models = {
    "application/json" = "Empty"
  }

  response_parameters = {
    "method.response.header.Access-Control-Allow-Origin" = true
  }
}

resource "aws_api_gateway_resource" "pangu_spacing_text" {
  rest_api_id = "${aws_api_gateway_rest_api.pangu.id}"
  parent_id   = "${aws_api_gateway_rest_api.pangu.root_resource_id}"
  path_part   = "spacing-text"
}

resource "aws_api_gateway_method" "pangu_spacing_text_get" {
  rest_api_id      = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id      = "${aws_api_gateway_resource.pangu_spacing_text.id}"
  http_method      = "GET"
  authorization    = "NONE"
  api_key_required = true
}

resource "aws_api_gateway_integration" "pangu_spacing_text_get" {
  rest_api_id             = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id             = "${aws_api_gateway_resource.pangu_spacing_text.id}"
  http_method             = "${aws_api_gateway_method.pangu_spacing_text_get.http_method}"
  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = "arn:aws:apigateway:${var.aws_region}:lambda:path/2015-03-31/functions/${var.apex_function_spacing_text}/invocations"
}

resource "aws_api_gateway_method_response" "pangu_spacing_text_get_200" {
  rest_api_id = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id = "${aws_api_gateway_resource.pangu_spacing_text.id}"
  http_method = "${aws_api_gateway_method.pangu_spacing_text_get.http_method}"
  status_code = "200"

  response_models = {
    "application/json" = "Empty"
  }

  response_parameters = {
    "method.response.header.Access-Control-Allow-Origin" = true
  }
}

resource "aws_api_gateway_deployment" "pangu" {
  depends_on = [
    "aws_api_gateway_method.pangu_root",
    "aws_api_gateway_integration.pangu_root_get",
    "aws_api_gateway_method_response.pangu_root_get_200",
    "aws_api_gateway_resource.pangu_spacing_text",
    "aws_api_gateway_method.pangu_spacing_text_get",
    "aws_api_gateway_integration.pangu_spacing_text_get",
    "aws_api_gateway_method_response.pangu_spacing_text_get_200",
  ]

  rest_api_id = "${aws_api_gateway_rest_api.pangu.id}"
  stage_name  = "v1"
}

resource "aws_lambda_permission" "pangu_root_get" {
  statement_id  = "AllowInvokeFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = "${var.apex_function_introduce}"
  principal     = "apigateway.amazonaws.com"

  source_arn = "arn:aws:execute-api:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${aws_api_gateway_rest_api.pangu.id}/*/${aws_api_gateway_integration.pangu_root_get.http_method}/"
}

resource "aws_lambda_permission" "pangu_spacing_text" {
  statement_id  = "AllowInvokeFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = "${var.apex_function_spacing_text}"
  principal     = "apigateway.amazonaws.com"

  source_arn = "arn:aws:execute-api:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${aws_api_gateway_rest_api.pangu.id}/*/${aws_api_gateway_integration.pangu_spacing_text_get.http_method}${aws_api_gateway_resource.pangu_spacing_text.path}"
}

ref:
https://www.terraform.io/docs/providers/aws/guides/serverless-with-aws-lambda-and-api-gateway.html

# donwload provider plugins
$ apex infra init

# view the generated execution plan
$ apex infra plan

# deploy your infrastructures
$ apex infra apply
$ apex infra apply -auto-approve

ref:
https://apex.run/#managing-infrastructure

Remotely debug a Python app inside a Docker container in Visual Studio Code

Remotely debug a Python app inside a Docker container in Visual Studio Code

Visual Studio Code with Python extension has "Remote Debugging" feature which means you could attach to a real remote host as well as a container on localhost. NOTE: While you trace Python code, the "Step Into" functionality is your good friend.

In this article, we are going to debug a Flask app inside a local Docker container through VS Code's fancy debugger, and simultaneously we are still able to leverage Flask's auto-reloading mechanism. It should apply to other Python apps.

ref:
https://code.visualstudio.com/docs/editor/debugging
https://code.visualstudio.com/docs/python/debugging#_remote-debugging

Install

On both host OS and the container, install ptvsd.

$ pip3 install -U ptvsd

2018.10.22 updated:

Visual Studio Code supports ptvsd 4 now!

ref:
https://github.com/Microsoft/ptvsd

Prepare

There are some materials and configurations. Assuming that you have a Dockerized Python Flask application like the following:

# Dockerfile
FROM python:3.6.6-alpine3.7 AS builder

WORKDIR /usr/src/app/

RUN apk add --no-cache --virtual .build-deps \
    build-base \
    openjpeg-dev \
    openssl-dev \
    zlib-dev

COPY requirements.txt .
RUN pip install --user -r requirements.txt

FROM python:3.6.6-alpine3.7 AS runner

ENV PATH=$PATH:/root/.local/bin
ENV FLASK_APP=app.py

WORKDIR /usr/src/app/

RUN apk add --no-cache --virtual .run-deps \
    openjpeg \
    openssl

EXPOSE 8000/tcp

COPY --from=builder /root/.local/ /root/.local/
COPY . .

CMD ["flask", "run"]
# docker-compose.yml
version: '3'
services:
    db:
        image: mongo:3.6
        ports:
            - "27017:27017"
        volumes:
            - ../data/mongodb:/data/db
    cache:
        image: redis:4.0
        ports:
            - "6379:6379"
    web:
        build: .
        command: .docker-assets/start-web.sh
        ports:
            - "3000:3000"
            - "8000:8000"
        volumes:
            - .:/usr/src/app
            - ../vendors:/root/.local
        depends_on:
            - db
            - cache

Usage

Method 1: Debug with --no-debugger, --reload and --without-threads

The convenient but a little fragile way: with auto-reloading enabled, you could change your source code on the fly. However, you might find that this method is much slower for the debugger to attach. It seems like --reload is not fully compatible with Remote Debugging.

We put ptvsd code to sitecustomize.py, as a result, ptvsd will run every time auto-reloading is triggered.

Steps:

  1. Set breakpoints
  2. Run your Flask app with --no-debugger, --reload and --without-threads
  3. Start the debugger with {"type": "python", "request": "attach", "preLaunchTask": "Enable remote debug"}
  4. Add ptvsd code to site-packages/sitecustomize.py by the pre-launch task automatically
  5. Click "Debug Anyway" button
  6. Access the part of code contains breakpoints
# site-packages/sitecustomize.py
try:
    import socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.close()
    import ptvsd
    ptvsd.enable_attach('my_secret', address=('0.0.0.0', 3000))
    print('ptvsd is started')
    # ptvsd.wait_for_attach()
    # print('debugger is attached')
except OSError as exc:
    print(exc)

ref:
https://docs.python.org/3/library/site.html

# .docker-assets/start-web.sh
rm -f /root/.local/lib/python3.6/site-packages/sitecustomize.py
pip3 install --user -r requirements.txt ptvsd
python -m flask run -h 0.0.0.0 -p 8000 --no-debugger --reload --without-threads
// .vscode/tasks.json
{
    "version": "2.0.0",
    "tasks": [
        {
            "label": "Enable remote debug",
            "type": "shell",
            "isBackground": true,
            "command": " docker cp sitecustomize.py project_web_1:/root/.local/lib/python3.6/site-packages/sitecustomize.py"
        }
    ]
}

ref:
https://code.visualstudio.com/docs/editor/tasks

// .vscode/launch.json
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Attach",
            "type": "python",
            "request": "attach",
            "localRoot": "${workspaceFolder}",
            "remoteRoot": "/usr/src/app",
            "port": 3000,
            "secret": "my_secret",
            "host": "localhost",
            "preLaunchTask": "Enable remote debug"
        }
    ]
}

ref:
https://code.visualstudio.com/docs/editor/debugging#_launch-configurations

Method 2: Debug with --no-debugger and --no-reload

The inconvenient but slightly reliable way: if you change any Python code, you need to restart the Flask app and re-attach debugger in Visual Studio Code.

Steps:

  1. Set breakpoints
  2. Add ptvsd code to your FLASK_APP file
  3. Run your Flask app with --no-debugger and --no-reload
  4. Start the debugger with {"type": "python", "request": "attach"}
  5. Access the part of code contains breakpoints
# in app.py
import ptvsd
ptvsd.enable_attach('my_secret', address=('0.0.0.0', 3000))
print('ptvsd is started')
# ptvsd.wait_for_attach()
# print('debugger is attached')

ref:
http://ramkulkarni.com/blog/debugging-django-project-in-docker/

# .docker-assets/start-web.sh
pip3 install --user -r requirements.txt ptvsd
python -m flask run -h 0.0.0.0 -p 8000 --no-debugger --no-reload
// .vscode/launch.json
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Attach",
            "type": "python",
            "request": "attach",
            "localRoot": "${workspaceFolder}",
            "remoteRoot": "/usr/src/app",
            "port": 3000,
            "secret": "my_secret",
            "host": "localhost"
        }
    ]
}

Method 3: Just don't use Remote Debugging, Run Debugger locally

You just run your Flask app on localhost (macOS) instead of putting it in a container. However, you could still host your database, cache server and message queue inside containers. Your Python app communicates with those services through ports which exposed to 127.0.0.1. Therefore, you could just use VS Code's debugger without strange tricks.

In practice, it is okay that your local development environment is different from the production environment.

# docker-compose.yml
version: '3'
services:
    db:
        image: mongo:3.6
        ports:
            - "27017:27017"
        volumes:
            - mongo-volume:/data/db
    cache:
        image: redis:4.0
        ports:
            - "6379:6379"
// .vscode/launch.json
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Flask",
            "type": "python",
            "request": "launch",
            "module": "flask",
            "console": "none",
            "pythonPath": "${config:python.pythonPath}",
            "cwd": "${workspaceFolder}",
            "envFile": "${workspaceFolder}/.env",
            "args": [
                "run",
                "--host=0.0.0.0",
                "--port=8000",
                "--no-debugger",
                "--no-reload"
            ],
            "jinja": true,
            "gevent": false
        }
    ]
}

Sadly, you cannot use --reload while launching your app in the debugger. Nevertheless, most of the time you don't really need the debugger - a fast auto-reloading workflow is good enough. All you need is a Makefile for running Flask app and Celery worker on macOS: make run_web and make run_worker.

# Makefile
install:
    pipenv install
    pipenv run pip install -ptvsd==4.1.1
    pipenv run pip install git+https://github.com/gorakhargosh/watchdog.git

shell:
    pipenv run python -m flask shell

run_web:
    pipenv run python -m flask run -h 0.0.0.0 -p 8000 --debugger --reload

run_worker:
    pipenv run watchmedo auto-restart -d . -p '*.py' -R -- celery -A app:celery worker --pid= -P gevent --without-gossip --prefetch-multiplier 1 -Ofair -l debug --purge

Bonus

You should try enabling debug.inlineValues which shows variable values inline in editor while debugging. It's awesome!

// settings.json
{
    "debug.inlineValues": true
}

ref:
https://code.visualstudio.com/updates/v1_9#_inline-variable-values-in-source-code

Issues

Starting the Python debugger is fucking slow
https://github.com/Microsoft/vscode-python/issues/106

Debugging library functions won't work currently
https://github.com/Microsoft/vscode-python/issues/111

Pylint for remote projects
https://gist.github.com/IBestuzhev/d022446f71267591be76fb48152175b7

Run a Celery task at a specific time

Run a Celery task at a specific time

Schedule Tasks

You are able to run any Celery task at a specific time through eta (means "Estimated Time of Arrival") parameter.

import datetime

import celery

@celery.shared_task(bind=True)
def add_tag(task, user_id, tag):
    User.objects.filter(id=user_id, tags__ne=tag).update(push__tags=tag)
    return True

user_id = '582ee32a5b9c861c87dc297e'
tag = 'new_tag'
started_at = datetime.datetime(2018, 3, 12, tzinfo=datetime.timezone.utc)
add_tag.apply_async((user_id, tag), eta=started_at)

ref:
http://docs.celeryproject.org/en/master/userguide/calling.html#eta-and-countdown

Revoke Tasks

Revoked tasks will be discarded until their eta.

from celery.result import AsyncResult

AsyncResult(task_id).revoke()

ref:
http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.revoke

Revoking tasks works by sending a broadcast message to all the workers, the workers then keep a list of revoked tasks in memory. When a worker starts up it will synchronize revoked tasks with other workers in the cluster.

The list of revoked tasks is in-memory so if all workers restart the list of revoked ids will also vanish. If you want to preserve this list between restarts you need to specify a file for these to be stored in by using the –statedb argument to celery worker.

ref:
http://docs.celeryproject.org/en/latest/userguide/workers.html#worker-persistent-revokes