Files
MC-Worldgen/export_mca.py
2025-12-02 18:38:45 -06:00

447 lines
14 KiB
Python

#!/usr/bin/env python3
import argparse
import math
import os
import struct
import time
import zlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from io import BytesIO
from pathlib import Path
cache_dir = Path('.numba_cache').resolve()
cache_dir.mkdir(exist_ok=True)
os.environ.setdefault("NUMBA_CACHE_DIR", str(cache_dir))
from mc import block_ids, worldGen
from settings import PLAYER_POS
TAG_END = 0
TAG_BYTE = 1
TAG_INT = 3
TAG_LONG = 4
TAG_STRING = 8
TAG_LIST = 9
TAG_COMPOUND = 10
TAG_INT_ARRAY = 11
TAG_LONG_ARRAY = 12
DATA_VERSION = 2586 # Minecraft 1.15.2
DEFAULT_RADIUS = 256 # ~512 block diameter (~0.5 km)
CHUNK_HEIGHT = 256
SECTION_COUNT = CHUNK_HEIGHT // 16
BIOME_ID = 1 # Plains
def build_state_mapping():
mapping = {}
def set_state(name, block_name, props=None):
block_id = block_ids.get(name)
if block_id is None:
return
props_tuple = tuple(sorted((props or {}).items()))
mapping[block_id] = (block_name, props_tuple)
set_state("air", "minecraft:air")
set_state("bedrock", "minecraft:bedrock")
set_state("stone", "minecraft:stone")
set_state("dirt", "minecraft:dirt")
set_state("grass", "minecraft:grass_block", {"snowy": "false"})
set_state("water", "minecraft:water", {"level": "0"})
set_state("oak_log", "minecraft:oak_log", {"axis": "y"})
set_state("oak_leaves", "minecraft:oak_leaves", {"distance": "1", "persistent": "false"})
set_state("birch_log", "minecraft:birch_log", {"axis": "y"})
set_state("birch_leaves", "minecraft:birch_leaves", {"distance": "1", "persistent": "false"})
return mapping
BLOCK_STATE_LOOKUP = build_state_mapping()
DEFAULT_AIR_STATE = BLOCK_STATE_LOOKUP[block_ids["air"]]
def to_signed_long(value):
value &= (1 << 64) - 1
if value >= (1 << 63):
value -= 1 << 64
return value
def pack_bits(indices, bits_per_value):
if not indices:
return []
total_bits = len(indices) * bits_per_value
longs_needed = (total_bits + 63) // 64
result = [0] * longs_needed
for idx, value in enumerate(indices):
bit_index = idx * bits_per_value
long_id = bit_index // 64
offset = bit_index % 64
result[long_id] |= value << offset
spill = offset + bits_per_value - 64
if spill > 0:
result[long_id + 1] |= value >> (bits_per_value - spill)
return [to_signed_long(v) for v in result]
def pack_heightmap(values):
return pack_bits(values, 9)
def block_state_from_id(block_id):
return BLOCK_STATE_LOOKUP.get(block_id, DEFAULT_AIR_STATE)
def build_section(chunk_x, chunk_z, section_y, heightmap, tree_blocks):
palette = []
palette_lookup = {}
block_indices = []
has_blocks = False
for y in range(16):
global_y = section_y * 16 + y
if global_y >= CHUNK_HEIGHT:
break
for z in range(16):
global_z = chunk_z * 16 + z
for x in range(16):
global_x = chunk_x * 16 + x
block_id = tree_blocks.get((global_x, global_y, global_z))
if block_id is None:
block_id = worldGen.generateBlock(global_x, global_y, global_z)
state = block_state_from_id(block_id)
key = state
if key not in palette_lookup:
palette_lookup[key] = len(palette)
palette.append(key)
idx = palette_lookup[key]
block_indices.append(idx)
name = state[0]
if name != "minecraft:air":
has_blocks = True
if global_y + 1 > heightmap[x][z]:
heightmap[x][z] = global_y + 1
if not has_blocks:
return None
palette_entries = []
for name, props in palette:
entry = {"Name": name}
if props:
entry["Properties"] = dict(props)
palette_entries.append(entry)
palette_len = max(1, len(palette_entries))
bits = max(4, math.ceil(math.log2(palette_len))) if palette_len > 1 else 4
block_states = pack_bits(block_indices, bits) if block_indices else []
if not block_states:
block_states = [0]
return {
"Y": section_y,
"Palette": palette_entries,
"BlockStates": block_states,
"BlockLight": bytearray(2048),
"SkyLight": bytearray(2048),
}
def build_biome_array():
return [BIOME_ID] * 256
def build_chunk(chunk_x, chunk_z):
heightmap = [[0] * 16 for _ in range(16)]
sections = []
tree_blocks = worldGen.generate_chunk_trees(chunk_x, chunk_z)
for section_y in range(SECTION_COUNT):
section = build_section(chunk_x, chunk_z, section_y, heightmap, tree_blocks)
if section:
sections.append(section)
height_values = []
for z in range(16):
for x in range(16):
height_values.append(heightmap[x][z])
chunk = {
"chunk_x": chunk_x,
"chunk_z": chunk_z,
"sections": sections,
"heightmap": height_values,
"biomes": build_biome_array(),
}
return chunk
def write_string(buffer, value):
encoded = value.encode('utf-8')
buffer.write(struct.pack('>H', len(encoded)))
buffer.write(encoded)
def write_tag_header(buffer, tag_type, name):
buffer.write(bytes([tag_type]))
write_string(buffer, name)
def write_byte(buffer, name, value):
write_tag_header(buffer, TAG_BYTE, name)
buffer.write(struct.pack('>b', value))
def write_int(buffer, name, value):
write_tag_header(buffer, TAG_INT, name)
buffer.write(struct.pack('>i', value))
def write_long(buffer, name, value):
write_tag_header(buffer, TAG_LONG, name)
buffer.write(struct.pack('>q', value))
def write_string_tag(buffer, name, value):
write_tag_header(buffer, TAG_STRING, name)
write_string(buffer, value)
def write_int_array(buffer, name, values):
write_tag_header(buffer, TAG_INT_ARRAY, name)
buffer.write(struct.pack('>i', len(values)))
for val in values:
buffer.write(struct.pack('>i', val))
def write_long_array(buffer, name, values):
write_tag_header(buffer, TAG_LONG_ARRAY, name)
buffer.write(struct.pack('>i', len(values)))
for val in values:
buffer.write(struct.pack('>q', to_signed_long(val)))
def write_byte_array(buffer, name, data):
write_tag_header(buffer, 7, name)
buffer.write(struct.pack('>i', len(data)))
buffer.write(data)
def start_compound(buffer, name):
write_tag_header(buffer, TAG_COMPOUND, name)
def end_compound(buffer):
buffer.write(bytes([TAG_END]))
def write_list(buffer, name, tag_type, items, payload_writer=None):
write_tag_header(buffer, TAG_LIST, name)
if not items:
buffer.write(bytes([TAG_END]))
buffer.write(struct.pack('>i', 0))
return
buffer.write(bytes([tag_type]))
buffer.write(struct.pack('>i', len(items)))
for item in items:
payload_writer(buffer, item)
def write_palette_entry(buffer, entry):
write_string_tag(buffer, "Name", entry["Name"])
props = entry.get("Properties")
if props:
start_compound(buffer, "Properties")
for key, value in props.items():
write_string_tag(buffer, key, value)
end_compound(buffer)
buffer.write(bytes([TAG_END]))
def write_section(buffer, section):
write_byte(buffer, "Y", section["Y"])
write_long_array(buffer, "BlockStates", section["BlockStates"])
write_list(buffer, "Palette", TAG_COMPOUND, section["Palette"], write_palette_entry)
write_byte_array(buffer, "BlockLight", section["BlockLight"])
write_byte_array(buffer, "SkyLight", section["SkyLight"])
buffer.write(bytes([TAG_END]))
def chunk_to_nbt(chunk):
buffer = BytesIO()
start_compound(buffer, "")
write_int(buffer, "DataVersion", DATA_VERSION)
start_compound(buffer, "Level")
write_string_tag(buffer, "Status", "full")
write_long(buffer, "InhabitedTime", 0)
write_long(buffer, "LastUpdate", 0)
write_int(buffer, "xPos", chunk["chunk_x"])
write_int(buffer, "zPos", chunk["chunk_z"])
write_byte(buffer, "isLightOn", 1)
start_compound(buffer, "Heightmaps")
write_long_array(buffer, "MOTION_BLOCKING", chunk["heightmap_packed"])
end_compound(buffer)
write_int_array(buffer, "Biomes", chunk["biomes"])
write_list(buffer, "Sections", TAG_COMPOUND, chunk["sections"], write_section)
write_list(buffer, "Entities", TAG_COMPOUND, [])
write_list(buffer, "TileEntities", TAG_COMPOUND, [])
write_list(buffer, "TileTicks", TAG_COMPOUND, [])
end_compound(buffer)
end_compound(buffer)
return buffer.getvalue()
def assemble_chunk_bytes(chunk):
chunk["heightmap_packed"] = pack_heightmap(chunk["heightmap"])
return chunk_to_nbt(chunk)
def chunk_range(center, radius):
block_min = center - radius
block_max = center + radius
chunk_min = math.floor(block_min / 16)
chunk_max = math.floor(block_max / 16)
return chunk_min, chunk_max
def write_region_file(path, chunks):
offsets = bytearray(4096)
timestamps = bytearray(4096)
body = bytearray()
sector = 2
for index, chunk_bytes in chunks.items():
compressed = zlib.compress(chunk_bytes)
payload = struct.pack('>I', len(compressed) + 1) + bytes([2]) + compressed
padding = (-len(payload)) % 4096
if padding:
payload += b'\x00' * padding
sectors = len(payload) // 4096
offsets[index * 4:index * 4 + 3] = (sector.to_bytes(3, 'big'))
offsets[index * 4 + 3] = sectors
timestamps[index * 4:index * 4 + 4] = struct.pack('>I', int(time.time()))
body.extend(payload)
sector += sectors
data = bytearray(8192)
data[0:4096] = offsets
data[4096:8192] = timestamps
data.extend(body)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(data)
def export_region_chunks(chunk_map, output_dir):
regions = {}
for (chunk_x, chunk_z), chunk_bytes in chunk_map.items():
region_x = chunk_x >> 5
region_z = chunk_z >> 5
local_x = chunk_x - (region_x << 5)
local_z = chunk_z - (region_z << 5)
index = local_x + local_z * 32
regions.setdefault((region_x, region_z), {})[index] = chunk_bytes
for (region_x, region_z), chunks in regions.items():
file_name = f"r.{region_x}.{region_z}.mca"
write_region_file(output_dir / file_name, chunks)
def parse_args():
parser = argparse.ArgumentParser(description="Export Minecraft chunks to MCA around spawn")
parser.add_argument("--radius", type=int, default=DEFAULT_RADIUS,
help="Export radius in blocks (default: %(default)s)")
parser.add_argument("--output", type=Path, default=Path("exports/mca"),
help="Destination directory for region files")
parser.add_argument("--center-x", type=int, default=None,
help="Override center X position (default: player spawn)")
parser.add_argument("--center-z", type=int, default=None,
help="Override center Z position (default: player spawn)")
parser.add_argument("--workers", type=int, default=os.cpu_count() or 1,
help="Number of worker threads (default: %(default)s)")
parser.add_argument("--minecraft-save", action="store_true",
help="Export to Minecraft saves folder (deletes old region files)")
return parser.parse_args()
def clean_minecraft_region_folder(region_path):
"""Delete all .mca files in the region folder."""
region_path = Path(region_path)
if not region_path.exists():
region_path.mkdir(parents=True, exist_ok=True)
return
mca_files = list(region_path.glob("*.mca"))
for mca_file in mca_files:
mca_file.unlink()
print(f"Deleted {mca_file}")
def process_chunk(args):
chunk_x, chunk_z = args
chunk_data = build_chunk(chunk_x, chunk_z)
chunk_bytes = assemble_chunk_bytes(chunk_data)
return chunk_x, chunk_z, chunk_bytes
def main():
args = parse_args()
center_x = args.center_x if args.center_x is not None else int(PLAYER_POS.x)
center_z = args.center_z if args.center_z is not None else int(PLAYER_POS.z)
radius = args.radius
# Determine output directory
if args.minecraft_save:
output_dir = Path.home() / ".minecraft" / "saves" / "New World (2)" / "region"
print(f"Cleaning Minecraft region folder: {output_dir}")
clean_minecraft_region_folder(output_dir)
else:
output_dir = args.output
chunk_x_min, chunk_x_max = chunk_range(center_x, radius)
chunk_z_min, chunk_z_max = chunk_range(center_z, radius)
chunk_coords = [
(chunk_x, chunk_z)
for chunk_z in range(chunk_z_min, chunk_z_max + 1)
for chunk_x in range(chunk_x_min, chunk_x_max + 1)
]
chunk_bytes_map = {}
total = len(chunk_coords)
if args.workers <= 1:
iterable = map(process_chunk, chunk_coords)
else:
executor = ThreadPoolExecutor(max_workers=args.workers)
futures = [executor.submit(process_chunk, coord) for coord in chunk_coords]
iterable = (future.result() for future in as_completed(futures))
try:
for processed, (chunk_x, chunk_z, chunk_bytes) in enumerate(iterable, start=1):
chunk_bytes_map[(chunk_x, chunk_z)] = chunk_bytes
if processed % 20 == 0 or processed == total:
print(f"Processed {processed}/{total} chunks")
finally:
if 'executor' in locals():
executor.shutdown(wait=True)
export_region_chunks(chunk_bytes_map, output_dir)
print(f"Export complete. Files written to {output_dir}")
if __name__ == "__main__":
main()