447 lines
14 KiB
Python
447 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import math
|
|
import os
|
|
import struct
|
|
import time
|
|
import zlib
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
cache_dir = Path('.numba_cache').resolve()
|
|
cache_dir.mkdir(exist_ok=True)
|
|
os.environ.setdefault("NUMBA_CACHE_DIR", str(cache_dir))
|
|
|
|
from mc import block_ids, worldGen
|
|
from settings import PLAYER_POS
|
|
|
|
|
|
TAG_END = 0
|
|
TAG_BYTE = 1
|
|
TAG_INT = 3
|
|
TAG_LONG = 4
|
|
TAG_STRING = 8
|
|
TAG_LIST = 9
|
|
TAG_COMPOUND = 10
|
|
TAG_INT_ARRAY = 11
|
|
TAG_LONG_ARRAY = 12
|
|
|
|
DATA_VERSION = 2586 # Minecraft 1.15.2
|
|
DEFAULT_RADIUS = 256 # ~512 block diameter (~0.5 km)
|
|
CHUNK_HEIGHT = 256
|
|
SECTION_COUNT = CHUNK_HEIGHT // 16
|
|
BIOME_ID = 1 # Plains
|
|
|
|
|
|
def build_state_mapping():
|
|
mapping = {}
|
|
|
|
def set_state(name, block_name, props=None):
|
|
block_id = block_ids.get(name)
|
|
if block_id is None:
|
|
return
|
|
props_tuple = tuple(sorted((props or {}).items()))
|
|
mapping[block_id] = (block_name, props_tuple)
|
|
|
|
set_state("air", "minecraft:air")
|
|
set_state("bedrock", "minecraft:bedrock")
|
|
set_state("stone", "minecraft:stone")
|
|
set_state("dirt", "minecraft:dirt")
|
|
set_state("grass", "minecraft:grass_block", {"snowy": "false"})
|
|
set_state("water", "minecraft:water", {"level": "0"})
|
|
set_state("oak_log", "minecraft:oak_log", {"axis": "y"})
|
|
set_state("oak_leaves", "minecraft:oak_leaves", {"distance": "1", "persistent": "false"})
|
|
set_state("birch_log", "minecraft:birch_log", {"axis": "y"})
|
|
set_state("birch_leaves", "minecraft:birch_leaves", {"distance": "1", "persistent": "false"})
|
|
return mapping
|
|
|
|
|
|
BLOCK_STATE_LOOKUP = build_state_mapping()
|
|
DEFAULT_AIR_STATE = BLOCK_STATE_LOOKUP[block_ids["air"]]
|
|
|
|
|
|
def to_signed_long(value):
|
|
value &= (1 << 64) - 1
|
|
if value >= (1 << 63):
|
|
value -= 1 << 64
|
|
return value
|
|
|
|
|
|
def pack_bits(indices, bits_per_value):
|
|
if not indices:
|
|
return []
|
|
total_bits = len(indices) * bits_per_value
|
|
longs_needed = (total_bits + 63) // 64
|
|
result = [0] * longs_needed
|
|
|
|
for idx, value in enumerate(indices):
|
|
bit_index = idx * bits_per_value
|
|
long_id = bit_index // 64
|
|
offset = bit_index % 64
|
|
result[long_id] |= value << offset
|
|
|
|
spill = offset + bits_per_value - 64
|
|
if spill > 0:
|
|
result[long_id + 1] |= value >> (bits_per_value - spill)
|
|
|
|
return [to_signed_long(v) for v in result]
|
|
|
|
|
|
def pack_heightmap(values):
|
|
return pack_bits(values, 9)
|
|
|
|
|
|
def block_state_from_id(block_id):
|
|
return BLOCK_STATE_LOOKUP.get(block_id, DEFAULT_AIR_STATE)
|
|
|
|
|
|
def build_section(chunk_x, chunk_z, section_y, heightmap, tree_blocks):
|
|
palette = []
|
|
palette_lookup = {}
|
|
block_indices = []
|
|
has_blocks = False
|
|
|
|
for y in range(16):
|
|
global_y = section_y * 16 + y
|
|
if global_y >= CHUNK_HEIGHT:
|
|
break
|
|
for z in range(16):
|
|
global_z = chunk_z * 16 + z
|
|
for x in range(16):
|
|
global_x = chunk_x * 16 + x
|
|
block_id = tree_blocks.get((global_x, global_y, global_z))
|
|
if block_id is None:
|
|
block_id = worldGen.generateBlock(global_x, global_y, global_z)
|
|
state = block_state_from_id(block_id)
|
|
|
|
key = state
|
|
if key not in palette_lookup:
|
|
palette_lookup[key] = len(palette)
|
|
palette.append(key)
|
|
idx = palette_lookup[key]
|
|
block_indices.append(idx)
|
|
|
|
name = state[0]
|
|
if name != "minecraft:air":
|
|
has_blocks = True
|
|
if global_y + 1 > heightmap[x][z]:
|
|
heightmap[x][z] = global_y + 1
|
|
|
|
if not has_blocks:
|
|
return None
|
|
|
|
palette_entries = []
|
|
for name, props in palette:
|
|
entry = {"Name": name}
|
|
if props:
|
|
entry["Properties"] = dict(props)
|
|
palette_entries.append(entry)
|
|
|
|
palette_len = max(1, len(palette_entries))
|
|
bits = max(4, math.ceil(math.log2(palette_len))) if palette_len > 1 else 4
|
|
block_states = pack_bits(block_indices, bits) if block_indices else []
|
|
if not block_states:
|
|
block_states = [0]
|
|
|
|
return {
|
|
"Y": section_y,
|
|
"Palette": palette_entries,
|
|
"BlockStates": block_states,
|
|
"BlockLight": bytearray(2048),
|
|
"SkyLight": bytearray(2048),
|
|
}
|
|
|
|
|
|
def build_biome_array():
|
|
return [BIOME_ID] * 256
|
|
|
|
|
|
def build_chunk(chunk_x, chunk_z):
|
|
heightmap = [[0] * 16 for _ in range(16)]
|
|
sections = []
|
|
tree_blocks = worldGen.generate_chunk_trees(chunk_x, chunk_z)
|
|
|
|
for section_y in range(SECTION_COUNT):
|
|
section = build_section(chunk_x, chunk_z, section_y, heightmap, tree_blocks)
|
|
if section:
|
|
sections.append(section)
|
|
|
|
height_values = []
|
|
for z in range(16):
|
|
for x in range(16):
|
|
height_values.append(heightmap[x][z])
|
|
|
|
chunk = {
|
|
"chunk_x": chunk_x,
|
|
"chunk_z": chunk_z,
|
|
"sections": sections,
|
|
"heightmap": height_values,
|
|
"biomes": build_biome_array(),
|
|
}
|
|
return chunk
|
|
|
|
|
|
def write_string(buffer, value):
|
|
encoded = value.encode('utf-8')
|
|
buffer.write(struct.pack('>H', len(encoded)))
|
|
buffer.write(encoded)
|
|
|
|
|
|
def write_tag_header(buffer, tag_type, name):
|
|
buffer.write(bytes([tag_type]))
|
|
write_string(buffer, name)
|
|
|
|
|
|
def write_byte(buffer, name, value):
|
|
write_tag_header(buffer, TAG_BYTE, name)
|
|
buffer.write(struct.pack('>b', value))
|
|
|
|
|
|
def write_int(buffer, name, value):
|
|
write_tag_header(buffer, TAG_INT, name)
|
|
buffer.write(struct.pack('>i', value))
|
|
|
|
|
|
def write_long(buffer, name, value):
|
|
write_tag_header(buffer, TAG_LONG, name)
|
|
buffer.write(struct.pack('>q', value))
|
|
|
|
|
|
def write_string_tag(buffer, name, value):
|
|
write_tag_header(buffer, TAG_STRING, name)
|
|
write_string(buffer, value)
|
|
|
|
|
|
def write_int_array(buffer, name, values):
|
|
write_tag_header(buffer, TAG_INT_ARRAY, name)
|
|
buffer.write(struct.pack('>i', len(values)))
|
|
for val in values:
|
|
buffer.write(struct.pack('>i', val))
|
|
|
|
|
|
def write_long_array(buffer, name, values):
|
|
write_tag_header(buffer, TAG_LONG_ARRAY, name)
|
|
buffer.write(struct.pack('>i', len(values)))
|
|
for val in values:
|
|
buffer.write(struct.pack('>q', to_signed_long(val)))
|
|
|
|
|
|
def write_byte_array(buffer, name, data):
|
|
write_tag_header(buffer, 7, name)
|
|
buffer.write(struct.pack('>i', len(data)))
|
|
buffer.write(data)
|
|
|
|
|
|
def start_compound(buffer, name):
|
|
write_tag_header(buffer, TAG_COMPOUND, name)
|
|
|
|
|
|
def end_compound(buffer):
|
|
buffer.write(bytes([TAG_END]))
|
|
|
|
|
|
def write_list(buffer, name, tag_type, items, payload_writer=None):
|
|
write_tag_header(buffer, TAG_LIST, name)
|
|
if not items:
|
|
buffer.write(bytes([TAG_END]))
|
|
buffer.write(struct.pack('>i', 0))
|
|
return
|
|
buffer.write(bytes([tag_type]))
|
|
buffer.write(struct.pack('>i', len(items)))
|
|
for item in items:
|
|
payload_writer(buffer, item)
|
|
|
|
|
|
def write_palette_entry(buffer, entry):
|
|
write_string_tag(buffer, "Name", entry["Name"])
|
|
props = entry.get("Properties")
|
|
if props:
|
|
start_compound(buffer, "Properties")
|
|
for key, value in props.items():
|
|
write_string_tag(buffer, key, value)
|
|
end_compound(buffer)
|
|
buffer.write(bytes([TAG_END]))
|
|
|
|
|
|
def write_section(buffer, section):
|
|
write_byte(buffer, "Y", section["Y"])
|
|
write_long_array(buffer, "BlockStates", section["BlockStates"])
|
|
write_list(buffer, "Palette", TAG_COMPOUND, section["Palette"], write_palette_entry)
|
|
write_byte_array(buffer, "BlockLight", section["BlockLight"])
|
|
write_byte_array(buffer, "SkyLight", section["SkyLight"])
|
|
buffer.write(bytes([TAG_END]))
|
|
|
|
|
|
def chunk_to_nbt(chunk):
|
|
buffer = BytesIO()
|
|
start_compound(buffer, "")
|
|
write_int(buffer, "DataVersion", DATA_VERSION)
|
|
|
|
start_compound(buffer, "Level")
|
|
write_string_tag(buffer, "Status", "full")
|
|
write_long(buffer, "InhabitedTime", 0)
|
|
write_long(buffer, "LastUpdate", 0)
|
|
write_int(buffer, "xPos", chunk["chunk_x"])
|
|
write_int(buffer, "zPos", chunk["chunk_z"])
|
|
write_byte(buffer, "isLightOn", 1)
|
|
|
|
start_compound(buffer, "Heightmaps")
|
|
write_long_array(buffer, "MOTION_BLOCKING", chunk["heightmap_packed"])
|
|
end_compound(buffer)
|
|
|
|
write_int_array(buffer, "Biomes", chunk["biomes"])
|
|
write_list(buffer, "Sections", TAG_COMPOUND, chunk["sections"], write_section)
|
|
write_list(buffer, "Entities", TAG_COMPOUND, [])
|
|
write_list(buffer, "TileEntities", TAG_COMPOUND, [])
|
|
write_list(buffer, "TileTicks", TAG_COMPOUND, [])
|
|
end_compound(buffer)
|
|
|
|
end_compound(buffer)
|
|
return buffer.getvalue()
|
|
|
|
|
|
def assemble_chunk_bytes(chunk):
|
|
chunk["heightmap_packed"] = pack_heightmap(chunk["heightmap"])
|
|
return chunk_to_nbt(chunk)
|
|
|
|
|
|
def chunk_range(center, radius):
|
|
block_min = center - radius
|
|
block_max = center + radius
|
|
chunk_min = math.floor(block_min / 16)
|
|
chunk_max = math.floor(block_max / 16)
|
|
return chunk_min, chunk_max
|
|
|
|
|
|
def write_region_file(path, chunks):
|
|
offsets = bytearray(4096)
|
|
timestamps = bytearray(4096)
|
|
body = bytearray()
|
|
sector = 2
|
|
|
|
for index, chunk_bytes in chunks.items():
|
|
compressed = zlib.compress(chunk_bytes)
|
|
payload = struct.pack('>I', len(compressed) + 1) + bytes([2]) + compressed
|
|
padding = (-len(payload)) % 4096
|
|
if padding:
|
|
payload += b'\x00' * padding
|
|
sectors = len(payload) // 4096
|
|
|
|
offsets[index * 4:index * 4 + 3] = (sector.to_bytes(3, 'big'))
|
|
offsets[index * 4 + 3] = sectors
|
|
timestamps[index * 4:index * 4 + 4] = struct.pack('>I', int(time.time()))
|
|
|
|
body.extend(payload)
|
|
sector += sectors
|
|
|
|
data = bytearray(8192)
|
|
data[0:4096] = offsets
|
|
data[4096:8192] = timestamps
|
|
data.extend(body)
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_bytes(data)
|
|
|
|
|
|
def export_region_chunks(chunk_map, output_dir):
|
|
regions = {}
|
|
for (chunk_x, chunk_z), chunk_bytes in chunk_map.items():
|
|
region_x = chunk_x >> 5
|
|
region_z = chunk_z >> 5
|
|
local_x = chunk_x - (region_x << 5)
|
|
local_z = chunk_z - (region_z << 5)
|
|
index = local_x + local_z * 32
|
|
regions.setdefault((region_x, region_z), {})[index] = chunk_bytes
|
|
|
|
for (region_x, region_z), chunks in regions.items():
|
|
file_name = f"r.{region_x}.{region_z}.mca"
|
|
write_region_file(output_dir / file_name, chunks)
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Export Minecraft chunks to MCA around spawn")
|
|
parser.add_argument("--radius", type=int, default=DEFAULT_RADIUS,
|
|
help="Export radius in blocks (default: %(default)s)")
|
|
parser.add_argument("--output", type=Path, default=Path("exports/mca"),
|
|
help="Destination directory for region files")
|
|
parser.add_argument("--center-x", type=int, default=None,
|
|
help="Override center X position (default: player spawn)")
|
|
parser.add_argument("--center-z", type=int, default=None,
|
|
help="Override center Z position (default: player spawn)")
|
|
parser.add_argument("--workers", type=int, default=os.cpu_count() or 1,
|
|
help="Number of worker threads (default: %(default)s)")
|
|
parser.add_argument("--minecraft-save", action="store_true",
|
|
help="Export to Minecraft saves folder (deletes old region files)")
|
|
return parser.parse_args()
|
|
|
|
|
|
def clean_minecraft_region_folder(region_path):
|
|
"""Delete all .mca files in the region folder."""
|
|
region_path = Path(region_path)
|
|
if not region_path.exists():
|
|
region_path.mkdir(parents=True, exist_ok=True)
|
|
return
|
|
|
|
mca_files = list(region_path.glob("*.mca"))
|
|
for mca_file in mca_files:
|
|
mca_file.unlink()
|
|
print(f"Deleted {mca_file}")
|
|
|
|
|
|
def process_chunk(args):
|
|
chunk_x, chunk_z = args
|
|
chunk_data = build_chunk(chunk_x, chunk_z)
|
|
chunk_bytes = assemble_chunk_bytes(chunk_data)
|
|
return chunk_x, chunk_z, chunk_bytes
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
center_x = args.center_x if args.center_x is not None else int(PLAYER_POS.x)
|
|
center_z = args.center_z if args.center_z is not None else int(PLAYER_POS.z)
|
|
radius = args.radius
|
|
|
|
# Determine output directory
|
|
if args.minecraft_save:
|
|
output_dir = Path.home() / ".minecraft" / "saves" / "New World (2)" / "region"
|
|
print(f"Cleaning Minecraft region folder: {output_dir}")
|
|
clean_minecraft_region_folder(output_dir)
|
|
else:
|
|
output_dir = args.output
|
|
|
|
chunk_x_min, chunk_x_max = chunk_range(center_x, radius)
|
|
chunk_z_min, chunk_z_max = chunk_range(center_z, radius)
|
|
|
|
chunk_coords = [
|
|
(chunk_x, chunk_z)
|
|
for chunk_z in range(chunk_z_min, chunk_z_max + 1)
|
|
for chunk_x in range(chunk_x_min, chunk_x_max + 1)
|
|
]
|
|
|
|
chunk_bytes_map = {}
|
|
total = len(chunk_coords)
|
|
|
|
if args.workers <= 1:
|
|
iterable = map(process_chunk, chunk_coords)
|
|
else:
|
|
executor = ThreadPoolExecutor(max_workers=args.workers)
|
|
futures = [executor.submit(process_chunk, coord) for coord in chunk_coords]
|
|
iterable = (future.result() for future in as_completed(futures))
|
|
|
|
try:
|
|
for processed, (chunk_x, chunk_z, chunk_bytes) in enumerate(iterable, start=1):
|
|
chunk_bytes_map[(chunk_x, chunk_z)] = chunk_bytes
|
|
if processed % 20 == 0 or processed == total:
|
|
print(f"Processed {processed}/{total} chunks")
|
|
finally:
|
|
if 'executor' in locals():
|
|
executor.shutdown(wait=True)
|
|
|
|
export_region_chunks(chunk_bytes_map, output_dir)
|
|
print(f"Export complete. Files written to {output_dir}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|