MISC

异常行为溯源 | @Luminoria @Ron

某企业网络安全部门人员正在对企业网络资产受到的攻击行为进行溯源分析,该工作人员发现攻击者删除了一段时间内的访问日志数据,但是攻击者曾传输过已被删除的访问日志数据并且被流量监控设备捕获,工作人员对流量数据进行了初步过滤并提取出了相应数据包。已知该攻击者在开始时曾尝试低密度的攻击,发现未被相关安全人员及时发现后进行了连续多日的攻击,请协助企业排查并定位攻击者IP,flag格式为:flag{md5(IP)}

出现了三种协议:TCP、giFT 和 X11

只把 TCP 提取出来的 log 并不完整,缺少了很多东西,giFT 数据量太少且不具有参考价值

但是可以看到数据的格式

最开始是经过 Base64 编码的内容,例如

1
eyJtc2ciOiJNemt1TVRrekxqWTJMakkwTlMwZ0xTQmJNRGd2U21GdUx6SXdNalU2TVRrNk5EUTZOVGtnS3pBd01EQmRJQ0pIUlZRZ0wyRndjQzlzYVhOMExuQm9jQ0JJVkZSUUx6RXVNU0lnTWpBd0lETTBOek1nSWkwaUlDSlBjR1Z5WVM4NUxqWTRMaWhZTVRFN0lFeHBiblY0SUdrMk9EWTdJR2xrTFVsRUtTQlFjbVZ6ZEc4dk1pNDVMakUzT0NCV1pYSnphVzl1THpFeExqQXdJZ289IiwidHlwZSI6IkxvZy1EYXRhIn0=

经过解码后是 json

1
{"msg":"MzkuMTkzLjY2LjI0NS0gLSBbMDgvSmFuLzIwMjU6MTk6NDQ6NTkgKzAwMDBdICJHRVQgL2FwcC9saXN0LnBocCBIVFRQLzEuMSIgMjAwIDM0NzMgIi0iICJPcGVyYS85LjY4LihYMTE7IExpbnV4IGk2ODY7IGlkLUlEKSBQcmVzdG8vMi45LjE3OCBWZXJzaW9uLzExLjAwIgo=","type":"Log-Data"}

发现 msg 字段也是 base64 编码内容,解码得到

1
39.193.66.245- - [08/Jan/2025:19:44:59 +0000] "GET /app/list.php HTTP/1.1" 200 3473 "-" "Opera/9.68.(X11; Linux i686; id-ID) Presto/2.9.178 Version/11.00"

所以我们可以得知数据流的格式应该为

1
base64.b64encode(json.dump({"msg": base64.b64encode("RAW_HTTP_LOG"), "type": "Log-Data"}))

重点应该在 X11 协议的数据,通过 Python 写个脚本提取一下

1
2
3
4
5
6
7
8
9
10
11
12
13
from scapy.all import rdpcap, Raw

packets = rdpcap("network_traffic.pcap")
count = 0
with open("output.bin", "wb") as f:
for pkt in packets:
count += 1
print(f"extracting...{count}",end='\r')
if Raw in pkt:
f.write(pkt[Raw].load)
f.write(b'\n')
else:
print(f"err when extract pkg {count}")

在用赛博厨子来提取一下(菜谱如下)

1
https://gchq.github.io/CyberChef/#recipe=From_Base64('A-Za-z0-9%2B/%3D',true,false)Regular_expression('User%20defined','%7B%22msg%22:%22(.*?)%22,%22type%22:%22Log-Data%22%7D',true,false,false,false,false,false,'List%20capture%20groups')From_Base64('A-Za-z0-9%2B/%3D',true,false)

得到日志文件,可以看到有一个很明显的日志

1
35.127.46.111 - - [09/Jan/2025:16:10:55 +0000] "POST /config_update.php HTTP/1.1" 200 3677 "-" "Mozilla/5.0 (compatible; MSIE 9.0; Windows 98; Win 9x 4.90; Trident/3.1)"

访问的是关键路径 config_update.php 而且进行了多次访问同一路径,符合题目中描述的攻击,得到 IP 为 35.127.46.111

再 MD5 一下,得到最后的 flag 为 flag{475ed6d7f74f586fb265f52eb42039b6}

数据校验 | @Luminoria

某平台发现部分用户的信息不满足平台的数据合规要求,现让你来协助分析平台用户数据。

请将所有不合规数据的序列号按照从小到大的顺序用 _ 连接后MD5并包上 flag{} 提交。例如,不合规的数据序列号为6、38、1680,则连接后为 6_38_1680 ,flag为 flag{1ffcb6d1c2108cd54a7743a6f91a289a}

数据清洗题目,从题目提供的 PDF 可以得到校验规则

  • 用户名 username 格式必须为 User- +字符串
  • UserName_Check 为用户名的32位小写 MD5 值
  • Password 密码只能出现大小写字母和数字
  • Password_Check 为密码的32为小写 MD5 值
  • IP 为用户的 IP 地址
  • Signature 用户签名值,对用户名采用 ECDSA 算法进行数字签名

所以用 Python 写个脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import hashlib
import string
import re
from ecdsa import SigningKey, BadSignatureError, VerifyingKey
from tqdm import tqdm
import base64
wrong_data = []

with open("数据校验/data.csv") as f:
for line in tqdm(f.readlines()):
serial, username, namecheck, passwd, passcheck, ip, sign = line.split(",")
if serial == "Serial_Number":
continue
# 检验用户名格式
if not username.startswith("User-"):
wrong_data.append(serial)
print(f"{serial} 用户名不合规!")
continue
# 检验用户名 md5
if hashlib.md5(username.encode()).hexdigest() != namecheck:
wrong_data.append(serial)
print(f"{serial} 用户名校验失败!")
continue
# 检验密码是否存在非法字符
for char in passwd:
pass_wrong = 0
if char not in string.ascii_letters + string.digits:
wrong_data.append(serial)
pass_wrong = 1
break
if pass_wrong:
print(f"{serial} 密码存在非法字符!")
continue
# 检验密码 md5
if hashlib.md5(passwd.encode()).hexdigest() != passcheck:
wrong_data.append(serial)
print(f"{serial} 密码校验失败!")
continue
# 检验 IP 地址格式
if not re.match(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", ip):
wrong_data.append(serial)
print(f"{serial} IP 地址格式错误!")
continue
# 校验签名
vk = VerifyingKey.from_pem(open(f"数据校验/ecdsa-key/{serial}.pem", "rt").read())
try:
# 将 Base64 编码的签名解码为字节
signature_bytes = base64.b64decode(sign)
# 验证签名,默认使用 SHA-1
vk.verify(signature_bytes, username.encode())
except BadSignatureError:
print(f"{serial} 签名校验失败!")
wrong_data.append(serial)
except ValueError as e:
print(f"{serial} 签名解码失败!错误:{e}")
wrong_data.append(serial)

print("_".join(wrong_data))
with open("数据校验/output.txt", "wt") as f:
f.write("_".join(wrong_data))

得到数据不合格的用户连接起来后为 4604_13960_20213_22586_29216_33270_40123_42949_45567

他们的问题分别是

  • 4604 用户名不合规!
  • 13960 用户名不合规!
  • 20213 用户名校验失败!
  • 22586 用户名校验失败!
  • 29216 密码存在非法字符!
  • 33270 密码校验失败!
  • 40123 IP 地址格式错误!
  • 42949 IP 地址格式错误!
  • 45567 签名校验失败!

再经过 MD5 计算得到 flag{6ba2c3dd58a321f8b75e75ecc2e06663}

Strange Database | @Luminoria | 未出

企业的数据库信息和密钥在传输过程中被截获了,请分析以下数据并获取flag.

给了一堆 sqlite3 的数据库文件和经过 OAEP 加密过后的 RSA 私钥

写个脚本提取一下数据库,便于后面处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os
import sqlite3
import pandas as pd
from glob import glob

DB_FOLDER = "./database"
OUTPUT_FOLDER = "data"

os.makedirs(OUTPUT_FOLDER, exist_ok=True)

db_files = glob(os.path.join(DB_FOLDER, "*.db"))

for db_file in db_files:
try:
# 连接数据库
conn = sqlite3.connect(db_file)
cursor = conn.cursor()

# 获取数据库名称(去掉路径和后缀)
db_name = os.path.splitext(os.path.basename(db_file))[0]

# 为当前数据库创建一个子文件夹存放 CSV
db_output_folder = os.path.join(OUTPUT_FOLDER, db_name)
os.makedirs(db_output_folder, exist_ok=True)

# 获取数据库中的所有表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()

for table in tables:
table_name = table[0]
try:
# 读取表数据
df = pd.read_sql_query(f"SELECT * FROM {table_name};", conn)

# 保存为 CSV
csv_file = os.path.join(db_output_folder, f"{table_name}.csv")
df.to_csv(csv_file, index=False, encoding="utf-8-sig")

print(f"已导出: {csv_file}")
except Exception as table_error:
print(f"导出表 {table_name} 失败: {table_error}")

# 关闭数据库连接
conn.close()
except Exception as e:
print(f"处理数据库 {db_file} 时出错: {e}")

然后再写个脚本来解密一下(注:文件名中有私钥的密码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import serialization, hashes
import base64
import glob
import os
from tqdm import tqdm

KEY_FOLDER = "key"
PLAIN_DATA_FOLDER = "data"

KEYS = sorted(glob.glob(f"{KEY_FOLDER}/*.pem"), key=lambda x: int(x.split("-")[1]))
DATA_FILES = sorted(
glob.glob(f"{PLAIN_DATA_FOLDER}/database-*/accounts.csv"),
key=lambda x: int(os.path.basename(os.path.dirname(x)).split("-")[1]),
)

all_data = [] # 所有数据
data_chunk = [] # 分块数据

remarks = open("output/remarks.txt", "wt")


def decrypt_data(data: str, key_file: str) -> str:
# 读取私钥
with open(key_file, "rb") as f:
private_key = serialization.load_pem_private_key(
f.read(),
password=key_file.split("-")[-1].replace(".pem", "").encode(),
)

# Base64 解码
encrypted_data = base64.b64decode(data)

# 解密
decrypted_data = private_key.decrypt(
encrypted_data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None,
),
)

return decrypted_data.decode()


if __name__ == "__main__":
for i in tqdm(range(0, 500)): # 表示数据索引
with open(DATA_FILES[i]) as f:
data = f.readlines()[1:] # 跳过首行
file_data = []
for line in data:
line_data = []
splited_data = line.split(",")
for idx in range(5): # 表示列索引
if idx == 1:
line_data.append(splited_data[idx])
continue
decrypted_data = decrypt_data(splited_data[idx], str(KEYS[i]))
line_data.append(decrypted_data)
if len(all_data) != i + 1:
all_data.append([])
if idx == 4:
remarks.write(decrypted_data)
all_data.append(",".join(line_data))
file_data.append(",".join(line_data))

with open(f"output/{i}.csv", "w") as f:
f.write("\n".join(file_data))

with open(f"output/all.csv", "a") as f:
f.write("\n".join(all_data[1:]))

然后可以导出看到数据,但是搜关键词找不到,想到可能在 remark 列里面的内容会藏有 flag,于是提取出来,发现还是没有 flag

那好吧,这分确实拿不到 =-=

Web | @Rusty

简单的仓库

一个简单的仓库系统,但是你会用他吗?

目录扫描

1
2
3
4
5
6
7
8
Target: http://eci-2zefyydc0iza52rh6cjw.cloudeci1.ichunqiu.com/

[11:12:48] Starting:
[11:13:11] 500 - 290B - /download/history.csv
[11:13:11] 500 - 290B - /download/users.csv
[11:13:17] 200 - 2KB - /login
[11:13:25] 200 - 2KB - /register
[11:13:31] 405 - 178B - /upload

充值时发现修改权限等级可以实现直接充值

1
{"amount":"999999","username":"rusty","permission":"admin"}

user改为admin可以发现hint,知道了flag的路径

发现/api/files显示目录有路径穿越

发现用户名是在file目录下分别有目录的,最后通过user实现路径穿越得到flag

补充

附上题目源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import shutil

from flask import Flask, render_template, request, redirect, url_for, session, send_from_directory
from flask_sqlalchemy import SQLAlchemy
from werkzeug.utils import secure_filename
import os
import re

app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(64)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///warehouse.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(10), unique=True, nullable=False)
password = db.Column(db.String(20), nullable=False)
permission = db.Column(db.String(10), nullable=False, default='guest')
balance = db.Column(db.Integer, nullable=False, default=0)

def __init__(self, username, password, permission="guest"):
self.username = username
self.password = password
self.permission = permission
self.balance = 0

@app.before_first_request
def create_tables():
if os.path.exists('warehouse'):
shutil.rmtree('warehouse')
os.makedirs('warehouse/admin')
with open('warehouse/admin/readme.txt', 'w', encoding='utf-8') as f:
f.write('/var/tmp/flag.txt')

if os.path.exists('warehouse.db'):
os.remove('warehouse.db')
db.create_all()

user = User(username="admin", password="NeverLoginMe!hhh", permission="admin")
db.session.add(user)
db.session.commit()

def validate_input(text):
return bool(re.match('^[a-zA-Z0-9]{4,10}$', text))

@app.route('/')
def index():
return redirect(url_for('login'))

@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')

if not validate_input(username) or not validate_input(password):
return render_template('register.html', error='用户名和密码必须是4-10位的字母数字组合')

if User.query.filter_by(username=username).first():
return render_template('register.html', error='用户名已存在')

user = User(username=username, password=password)
db.session.add(user)
db.session.commit()

user_dir = os.path.join('warehouse', username)
os.makedirs(user_dir)
with open(os.path.join(user_dir, '使用小仓库开通vip请联系admin充值.txt'), 'w', encoding='utf-8') as f:
f.write('欢迎使用小仓库')
with open(os.path.join(user_dir, 'readme.txt'), 'w', encoding='utf-8') as f:
f.write('欢迎使用小仓库')

return redirect(url_for('login'))

return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')

if not validate_input(username) or not validate_input(password):
return render_template('login.html', error='用户名和密码必须是4-10位的字母数字组合')

user = User.query.filter_by(username=username, password=password).first()
if user:
session['user'] = username
return redirect(url_for('warehouse'))

return render_template('login.html', error='用户名或密码错误')

return render_template('login.html')

@app.route('/warehouse')
def warehouse():
if 'user' not in session:
return redirect(url_for('login'))

user = User.query.filter_by(username=session['user']).first()
if not user:
session.clear()
return redirect(url_for('login'))

return render_template('warehouse.html',
username=user.username,
permission=user.permission,
balance=user.balance)

@app.route('/upload', methods=['POST'])
def upload():
if 'user' not in session:
return '权限不足', 403

user = User.query.filter_by(username=session['user']).first()
if not user:
session.clear()
return redirect(url_for('login'))

if user.permission not in ['vip', 'admin']:
session.clear()
return redirect(url_for('login', error="错误,你不是vip"))

if 'file' not in request.files:
return redirect(url_for('warehouse'))
path = request.args.get('path', session['user'])

if path != user.username:
return '权限不足,只有管理员能对其他用户的仓库进行上传或下载', 403

file = request.files['file']
if file.filename == '':
return redirect(url_for('warehouse'))

filename = secure_filename(file.filename)
file.save(os.path.join('warehouse', path, filename))
return redirect(url_for('warehouse'))

@app.route('/download/<filename>')
def download(filename):
user = User.query.filter_by(username=session['user']).first()
if not user:
session.clear()
return redirect(url_for('login'))

if user.permission not in ['vip', 'admin']:
session.clear()
return '权限错误或不足', 403

target_user = request.args.get('user', session['user'])

return send_from_directory(os.path.join('warehouse', target_user), filename)

@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))

@app.route('/api/files', methods=['POST'])
def get_files():
if 'user' not in session:
return {'error': '权限不足'}, 403

user = User.query.filter_by(username=session['user']).first()
if not user:
session.clear()
return redirect(url_for('login'))

if user.permission not in ['guest', 'vip', 'admin']:
session.clear()
return {'error': '权限错误'}, 403

data = request.get_json()
target_user = data.get('path', session['user'])

base_dir = os.path.join('warehouse', target_user)
if not os.path.exists(base_dir) or not os.path.isdir(base_dir):
return {'error': '目录不存在'}, 404

files = os.listdir(base_dir)
return {'files': files}

@app.route('/api/recharge', methods=['POST'])
def recharge():
if 'user' not in session:
return {'error': '未登录'}, 403

data = request.get_json()
amount = data.get('amount')
target_user = data.get('username')
user_permission = data.get('permission')

if not amount or not amount.isdigit() or int(amount) <= 0:
return {'error': '充值金额无效'}, 400

amount = int(amount)
user = User.query.filter_by(username=target_user).first()
if not user:
return {'error': '用户不存在'}, 404

if user_permission not in ['guest', 'vip', 'admin']:
return {'error': '拒绝访问'}, 403

if user_permission == 'admin':
user.balance += amount
db.session.commit()
return {'message': f'充值成功,当前余额: {user.balance}'}
else:
return {'message': '充值请求已提交,等待管理员审核'}

@app.route('/api/upgrade', methods=['POST'])
def upgrade_to_vip():
if 'user' not in session:
return {'error': '未登录'}, 403

user = User.query.filter_by(username=session['user']).first()
if not user:
return {'error': '用户不存在'}, 404

if user.balance < 99999:
return {'error': '余额不足,无法开通VIP'}, 400

user.balance -= 99999
user.permission = 'vip'
db.session.commit()
session['permission'] = 'vip'
return {'message': 'VIP开通成功!'}

if __name__ == '__main__':
app.run(debug=True,host="0.0.0.0", port=5000)